This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new eff073ff119 [FLINK-34313][doc][table] Add document for session window
tvf
eff073ff119 is described below
commit eff073ff1199acf0f26a0f04ede7d692837301c3
Author: Xuyang <[email protected]>
AuthorDate: Sun Feb 4 23:24:54 2024 +0800
[FLINK-34313][doc][table] Add document for session window tvf
This closes #24250
---
.../docs/dev/table/sql/queries/window-agg.md | 124 +++++++-----
.../docs/dev/table/sql/queries/window-join.md | 4 +
.../docs/dev/table/sql/queries/window-topn.md | 4 +
.../docs/dev/table/sql/queries/window-tvf.md | 208 +++++++++++++++++----
docs/content.zh/docs/dev/table/tuning.md | 2 +-
.../docs/dev/table/sql/queries/window-agg.md | 124 +++++++-----
.../docs/dev/table/sql/queries/window-join.md | 4 +
.../docs/dev/table/sql/queries/window-topn.md | 4 +
.../docs/dev/table/sql/queries/window-tvf.md | 203 ++++++++++++++++----
docs/content/docs/dev/table/tuning.md | 2 +-
10 files changed, 504 insertions(+), 175 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/sql/queries/window-agg.md
b/docs/content.zh/docs/dev/table/sql/queries/window-agg.md
index f9395745202..ced9016bf41 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/window-agg.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/window-agg.md
@@ -40,11 +40,15 @@ GROUP BY window_start, window_end, ...
### 窗口表值函数
-Flink 支持在 `TUMBLE`, `HOP` 和 `CUMULATE` 上进行窗口聚合。
+Flink 支持在 `TUMBLE`,`HOP`,`CUMULATE` 和 `SESSION` 上进行窗口聚合。
在流模式下,窗口表值函数的时间属性字段必须是 [事件时间或处理时间]({{< ref
"docs/dev/table/concepts/time_attributes" >}})。关于窗口函数更多信息,参见 [Windowing
TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}})。
在批模式下,窗口表值函数的时间属性字段必须是 `TIMESTAMP` 或 `TIMESTAMP_LTZ` 类型的。
-这里有关于 `TUMBLE`,`HOP` 和 `CUMULATE` 窗口聚合的几个例子:
+{{< hint info >}}
+注意:`SESSION` 窗口聚合目前不支持批模式。
+{{< /hint >}}
+
+这里有关于 `TUMBLE`,`HOP`,`CUMULATE` 和 `SESSION` 窗口聚合的几个例子:
```sql
-- tables must have time attribute, e.g. `bidtime` in this table
@@ -71,48 +75,74 @@ Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+-------------+
-- tumbling window aggregation
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
++------------------+------------------+-------------+
-- hopping window aggregation
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10'
MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
++------------------+------------------+-------------+
-- cumulative window aggregation
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL
'10' MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
++------------------+------------------+-------------+
+
+-- session window aggregation with partition keys
+Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) AS
total_price
+ FROM TABLE(
+ SESSION(TABLE Bid PARTITION BY supplier_id,
DESCRIPTOR(bidtime), INTERVAL '2' MINUTES))
+ GROUP BY window_start, window_end, supplier_id;
++------------------+------------------+-------------+-------------+
+| window_start | window_end | supplier_id | total_price |
++------------------+------------------+-------------+-------------+
+| 2020-04-15 08:05 | 2020-04-15 08:09 | supplier1 | 6.00 |
+| 2020-04-15 08:09 | 2020-04-15 08:13 | supplier2 | 8.00 |
+| 2020-04-15 08:13 | 2020-04-15 08:15 | supplier1 | 1.00 |
+| 2020-04-15 08:17 | 2020-04-15 08:19 | supplier2 | 6.00 |
++------------------+------------------+-------------+-------------+
+
+-- session window aggregation without partition keys
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
+ FROM TABLE(
+ SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES))
+ GROUP BY window_start, window_end;
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
+| 2020-04-15 08:17 | 2020-04-15 08:19 | 6.00 |
++------------------+------------------+-------------+
```
*注意: 为了更好地理解窗口行为,这里把 timestamp 值后面的 0 去掉了,例如:在 Flink SQL Client 中,如果类型是
`TIMESTAMP(3)`,`2020-04-15 08:05` 应该显示成 `2020-04-15 08:05:00.000`。*
@@ -124,20 +154,20 @@ Flink SQL> SELECT window_start, window_end, SUM(price)
`GROUPING SETS` 窗口聚合中 `GROUP BY` 子句必须包含 `window_start` 和 `window_end` 列,但
`GROUPING SETS` 子句中不能包含这两个字段。
```sql
-Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price
+Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) AS
total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
-+------------------+------------------+-------------+-------+
-| window_start | window_end | supplier_id | price |
-+------------------+------------------+-------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 |
-+------------------+------------------+-------------+-------+
++------------------+------------------+-------------+-------------+
+| window_start | window_end | supplier_id | total_price |
++------------------+------------------+-------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 |
++------------------+------------------+-------------+-------------+
```
`GROUPING SETS` 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 `GROUP BY` 子句是一样的。一个空的
Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。
@@ -153,7 +183,7 @@ Flink SQL> SELECT window_start, window_end, supplier_id,
SUM(price) as price
例如:下面这个查询和上个例子中的效果是一样的。
```sql
-SELECT window_start, window_end, supplier_id, SUM(price) as price
+SELECT window_start, window_end, supplier_id, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (supplier_id);
@@ -168,12 +198,12 @@ GROUP BY window_start, window_end, ROLLUP (supplier_id);
例如:下面两个查询是等效的。
```sql
-SELECT window_start, window_end, item, supplier_id, SUM(price) as price
+SELECT window_start, window_end, item, supplier_id, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, CUBE (supplier_id, item);
-SELECT window_start, window_end, item, supplier_id, SUM(price) as price
+SELECT window_start, window_end, item, supplier_id, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS (
@@ -200,13 +230,13 @@ SELECT window_start, window_end, item, supplier_id,
SUM(price) as price
-- tumbling 5 minutes for each supplier_id
CREATE VIEW window1 AS
-- Note: The window start and window end fields of inner Window TVF are
optional in the select clause. However, if they appear in the clause, they need
to be aliased to prevent name conflicting with the window start and window end
of the outer Window TVF.
-SELECT window_start as window_5mintumble_start, window_end as
window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price
+SELECT window_start AS window_5mintumble_start, window_end AS
window_5mintumble_end, window_time AS rowtime, SUM(price) AS partial_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY supplier_id, window_start, window_end, window_time;
-- tumbling 10 minutes on the first window
-SELECT window_start, window_end, SUM(partial_price) as total_price
+SELECT window_start, window_end, SUM(partial_price) AS total_price
FROM TABLE(
TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
diff --git a/docs/content.zh/docs/dev/table/sql/queries/window-join.md
b/docs/content.zh/docs/dev/table/sql/queries/window-join.md
index e026e45f4bf..f73ad316c23 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/window-join.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/window-join.md
@@ -31,6 +31,10 @@ under the License.
通常,窗口关联和 [窗口表值函数]({{< ref "docs/dev/table/sql/queries/window-tvf" >}})
一起使用。而且,窗口关联可以在其他基于 [窗口表值函数]({{< ref "docs/dev/table/sql/queries/window-tvf"
>}}) 的操作后使用,例如 [窗口聚合]({{< ref "docs/dev/table/sql/queries/window-agg" >}}),[窗口
Top-N]({{< ref "docs/dev/table/sql/queries/window-topn">}}) 和 [窗口关联]({{< ref
"docs/dev/table/sql/queries/window-join">}})。
+{{< hint info >}}
+注意:`SESSION` 窗口关联目前不支持批模式。
+{{< /hint >}}
+
目前,窗口关联需要在 join on 条件中包含两个输入表的 `window_start` 等值条件和 `window_end` 等值条件。
窗口关联支持 INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN。
diff --git a/docs/content.zh/docs/dev/table/sql/queries/window-topn.md
b/docs/content.zh/docs/dev/table/sql/queries/window-topn.md
index 95649bc7b91..1567c2f46d4 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/window-topn.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/window-topn.md
@@ -31,6 +31,10 @@ under the License.
因此,窗口 Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好。通常,窗口 Top-N 直接用于
[窗口表值函数]({{< ref "docs/dev/table/sql/queries/window-tvf" >}})上。
另外,窗口 Top-N 可以用于基于 [窗口表值函数]({{< ref "docs/dev/table/sql/queries/window-tvf"
>}}) 的操作之上,比如 [窗口聚合]({{< ref "docs/dev/table/sql/queries/window-agg" >}}),[窗口
Top-N]({{< ref "docs/dev/table/sql/queries/window-topn">}}) 和 [窗口关联]({{< ref
"docs/dev/table/sql/queries/window-join">}})。
+{{< hint info >}}
+注意:`SESSION` 窗口 Top-N 目前不支持批模式。
+{{< /hint >}}
+
窗口 Top-N 的语法和普通的 Top-N 相同,更多信息参见:[Top-N 文档]({{< ref
"docs/dev/table/sql/queries/topn" >}})。
除此之外,窗口 Top-N 需要 `PARTITION BY` 子句包含 [窗口表值函数]({{< ref
"docs/dev/table/sql/queries/window-tvf" >}}) 或 [窗口聚合]({{< ref
"docs/dev/table/sql/queries/window-agg" >}}) 产生的 `window_start` 和 `window_end`。
否则优化器无法翻译。
diff --git a/docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
b/docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
index 986aab222c2..2cf847a43d9 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
@@ -30,10 +30,10 @@ under the License.
Apache Flink 提供了如下 `窗口表值函数`(table-valued function, 缩写TVF)把表的数据划分到窗口中:
-- [滚动窗口](#tumble)
-- [滑动窗口](#hop)
-- [累积窗口](#cumulate)
-- 会话窗口 (即将支持)
+- [滚动窗口](#滚动窗口tumble)
+- [滑动窗口](#滑动窗口hop)
+- [累积窗口](#累积窗口cumulate)
+- [会话窗口](#会话窗口session) (目前仅支持流模式)
注意:逻辑上,每个元素可以应用于一个或多个窗口,这取决于所使用的 `窗口表值函数`。例如:滑动窗口可以把单个元素分配给多个窗口。
@@ -49,7 +49,7 @@ Apache Flink 提供了如下 `窗口表值函数`(table-valued function, 缩
## 窗口函数
-Apache Flink 提供3个内置的窗口表值函数:`TUMBLE`,`HOP` 和 `CUMULATE`。`窗口表值函数`
的返回值包括原生列和附加的三个指定窗口的列,分别是:“window_start”,“window_end”,“window_time”。
+Apache Flink 提供 4 个内置的窗口表值函数:`TUMBLE`,`HOP`,`CUMULATE` 和 `SESSION`。`窗口表值函数`
的返回值包括原生列和附加的三个指定窗口的列,分别是:“window_start”,“window_end”,“window_time”。
在流计算模式,`window_time` 是 `TIMESTAMP` 或者 `TIMESTAMP_LTZ`
类型(具体哪种类型取决于输入的时间字段类型)的字段。
`window_time` 字段用于后续基于时间的操作,例如:其他的窗口表值函数,或者<a href="{{< ref
"docs/dev/table/sql/queries/joins" >}}#interval-joins">interval joins</a>,<a
href="{{< ref "docs/dev/table/sql/queries/over-agg" >}}">over aggregations</a>。
它的值总是等于 `window_end - 1ms`。
@@ -122,16 +122,16 @@ Flink SQL> SELECT * FROM TABLE(
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the tumbling windowed table
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
++------------------+------------------+-------------+
```
*注意:为了更好地理解窗口行为,这里把 timestamp 值得后面的 0 去掉了。例如:在 Flink SQL Client 中,如果类型是
`TIMESTAMP(3)`,`2020-04-15 08:05` 应该显示成 `2020-04-15 08:05:00.000`*
@@ -193,18 +193,18 @@ HOP(TABLE data, DESCRIPTOR(timecol), slide, size [,
offset ])
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the hopping windowed table
-> SELECT window_start, window_end, SUM(price)
+> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10'
MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
++------------------+------------------+-------------+
```
### 累积窗口(CUMULATE)
@@ -271,22 +271,148 @@ CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the cumulating windowed table
-> SELECT window_start, window_end, SUM(price)
+> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL
'10' MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
++------------------+------------------+-------------+
+```
+
+### 会话窗口(SESSION)
+
+{{< hint info >}}
+注意:
+1. 会话窗口函数目前不支持批模式。
+2. 会话窗口函数目前不支持 [性能调优]({{< ref "docs/dev/table/tuning" >}}) 中的任何优化。
+3. 会话窗口 Join 、会话窗口 Top-N 、会话窗口聚合功能目前理论可用,但仍处于实验阶段。遇到问题可以在
[JIRA](https://issues.apache.org/jira/browse/FLINK) 中报告。
+{{< /hint >}}
+
+会话窗口函数通过会话活动对元素进行分组。与滚动窗口和滑动窗口不同,会话窗口不重叠,也没有固定的开始和结束时间。
+一个会话窗口会在一定时间内没有收到元素时关闭,比如超过一定时间不处于活跃状态。
+会话窗口需要配置一个固定的会话间隙,以定义不活跃间隙的时长。
+当超出不活跃间隙的时候,当前的会话窗口将会关闭,随后的元素将被分配到一个新的会话窗口内。
+
+比如,定义一个不活跃间隙时长为 10 分钟的会话窗口。
+如果同一用户两个事件之间的时间间隔小于 10 分钟,这些事件将会被归入到同一个会话窗口中。
+如果在最新事件后的 10 分钟内没有数据,那么这个会话窗口将会关闭,并被发送到下游。
+随后的事件将会被分配到一个新的会话窗口里。
+
+{{< img src="/fig/session-windows.svg" alt="Session windows" width="70%">}}
+
+`SESSION` 函数通过时间属性字段为每一行数据分配了一个窗口。
+在流计算模式,这个时间属性字段必须被指定为 [事件或处理时间属性]({{< ref
"docs/dev/table/concepts/time_attributes" >}})。
+`SESSION`
的返回值包括原始表的所有列和附加的三个用于指定窗口的列,分别是:“window_start”,“window_end”,“window_time”。函数运行后,原有的时间属性
“timecol” 将转换为一个常规的 timestamp 列。
+
+`SESSION` 有三个必填参数和一个可选参数:
+
+```sql
+SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
+```
+
+- `data`:拥有时间属性列的表。
+- `keycols`:列描述符,决定会话窗口应该使用哪些列来分区数据。
+- `timecol`:列描述符,决定数据的哪个时间属性列应该映射到窗口。
+- `gap`:两个事件被认为属于同一个会话窗口的最大时间间隔。
+
+下面是 Bid 表的调用示例:
+
+```sql
+-- tables must have time attribute, e.g. `bidtime` in this table
+Flink SQL> desc Bid;
++-------------+------------------------+------+-----+--------+---------------------------------+
+| name | type | null | key | extras |
watermark |
++-------------+------------------------+------+-----+--------+---------------------------------+
+| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` -
INTERVAL '1' SECOND |
+| price | DECIMAL(10, 2) | true | | |
|
+| item | STRING | true | | |
|
++-------------+------------------------+------+-----+--------+---------------------------------+
+
+Flink SQL> SELECT * FROM Bid;
++------------------+-------+------+
+| bidtime | price | item |
++------------------+-------+------+
+| 2020-04-15 08:07 | 4.00 | A |
+| 2020-04-15 08:06 | 2.00 | A |
+| 2020-04-15 08:09 | 5.00 | B |
+| 2020-04-15 08:08 | 3.00 | A |
+| 2020-04-15 08:17 | 1.00 | B |
++------------------+-------+------+
+
+-- session window with partition keys
+> SELECT * FROM TABLE(
+ SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5'
MINUTES));
+-- or with the named params
+-- note: the DATA param must be the first
+> SELECT * FROM TABLE(
+ SESSION(
+ DATA => TABLE Bid PARTITION BY item,
+ TIMECOL => DESCRIPTOR(bidtime),
+ GAP => INTERVAL '5' MINUTES);
++------------------+-------+------+------------------+------------------+-------------------------+
+| bidtime | price | item | window_start | window_end |
window_time |
++------------------+-------+------+------------------+------------------+-------------------------+
+| 2020-04-15 08:07 | 4.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 |
2020-04-15 08:12:59.999 |
+| 2020-04-15 08:06 | 2.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 |
2020-04-15 08:12:59.999 |
+| 2020-04-15 08:08 | 3.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 |
2020-04-15 08:12:59.999 |
+| 2020-04-15 08:09 | 5.00 | B | 2020-04-15 08:09 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:17 | 1.00 | B | 2020-04-15 08:17 | 2020-04-15 08:22 |
2020-04-15 08:21:59.999 |
++------------------+-------+------+------------------+------------------+-------------------------+
+
+-- apply aggregation on the session windowed table with partition keys
+> SELECT window_start, window_end, item, SUM(price) AS total_price
+ FROM TABLE(
+ SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5'
MINUTES))
+ GROUP BY item, window_start, window_end;
++------------------+------------------+------+-------------+
+| window_start | window_end | item | total_price |
++------------------+------------------+------+-------------+
+| 2020-04-15 08:06 | 2020-04-15 08:13 | A | 9.00 |
+| 2020-04-15 08:09 | 2020-04-15 08:14 | B | 5.00 |
+| 2020-04-15 08:17 | 2020-04-15 08:22 | B | 1.00 |
++------------------+------------------+------+-------------+
+
+-- session window without partition keys
+> SELECT * FROM TABLE(
+ SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));
+-- or with the named params
+-- note: the DATA param must be the first
+> SELECT * FROM TABLE(
+ SESSION(
+ DATA => TABLE Bid,
+ TIMECOL => DESCRIPTOR(bidtime),
+ GAP => INTERVAL '5' MINUTES);
++------------------+-------+------+------------------+------------------+-------------------------+
+| bidtime | price | item | window_start | window_end |
window_time |
++------------------+-------+------+------------------+------------------+-------------------------+
+| 2020-04-15 08:07 | 4.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:06 | 2.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:08 | 3.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:09 | 5.00 | B | 2020-04-15 08:06 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:17 | 1.00 | B | 2020-04-15 08:17 | 2020-04-15 08:22 |
2020-04-15 08:21:59.999 |
++------------------+-------+------+------------------+------------------+-------------------------+
+
+-- apply aggregation on the session windowed table without partition keys
+> SELECT window_start, window_end, SUM(price) AS total_price
+ FROM TABLE(
+ SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
+ GROUP BY window_start, window_end;
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:06 | 2020-04-15 08:14 | 14.00 |
+| 2020-04-15 08:17 | 2020-04-15 08:22 | 1.00 |
++------------------+------------------+-------------+
```
## 窗口偏移
@@ -331,16 +457,16 @@ Flink SQL> SELECT * FROM TABLE(
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the tumbling windowed table
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1'
MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
-| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
+| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
++------------------+------------------+-------------+
```
*注意:为了更好地理解窗口行为,这里把 timestamp 值得后面的 0 去掉了。例如:在 Flink SQL Client 中,如果类型是
`TIMESTAMP(3)`,`2020-04-15 08:05` 应该显示成 `2020-04-15 08:05:00.000`*
diff --git a/docs/content.zh/docs/dev/table/tuning.md
b/docs/content.zh/docs/dev/table/tuning.md
index 2250d2b275a..0fe999a57bd 100644
--- a/docs/content.zh/docs/dev/table/tuning.md
+++ b/docs/content.zh/docs/dev/table/tuning.md
@@ -32,7 +32,7 @@ SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用
在这一页,我们将介绍一些实用的优化选项以及流式聚合的内部原理,它们在某些情况下能带来很大的提升。
{{< hint info >}}
-The streaming aggregation optimizations mentioned in this page are all
supported for [Group Aggregations]({{< ref
"docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{<
ref "docs/dev/table/sql/queries/window-agg" >}}) now.
+目前 [分组聚合] ({{< ref "docs/dev/table/sql/queries/group-agg" >}}) 和
[窗口表值函数聚合]({{< ref "docs/dev/table/sql/queries/window-agg" >}})
(会话窗口表值函数聚合除外)都支持本页提到的流式聚合优化。
{{< /hint >}}
diff --git a/docs/content/docs/dev/table/sql/queries/window-agg.md
b/docs/content/docs/dev/table/sql/queries/window-agg.md
index c74e067627a..716a22c8aec 100644
--- a/docs/content/docs/dev/table/sql/queries/window-agg.md
+++ b/docs/content/docs/dev/table/sql/queries/window-agg.md
@@ -40,11 +40,15 @@ Unlike other aggregations on continuous tables, window
aggregation do not emit i
### Windowing TVFs
-Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations.
+Flink supports `TUMBLE`, `HOP`, `CUMULATE` and `SESSION` types of window
aggregations.
In streaming mode, the time attribute field of a window table-valued function
must be on either [event or processing time attributes]({{< ref
"docs/dev/table/concepts/time_attributes" >}}). See [Windowing TVF]({{< ref
"docs/dev/table/sql/queries/window-tvf" >}}) for more windowing functions
information.
In batch mode, the time attribute field of a window table-valued function must
be an attribute of type `TIMESTAMP` or `TIMESTAMP_LTZ`.
-Here are some examples for `TUMBLE`, `HOP` and `CUMULATE` window aggregations.
+{{< hint info >}}
+Note: `SESSION` Window Aggregation is not supported in batch mode now.
+{{< /hint >}}
+
+Here are some examples for `TUMBLE`, `HOP`, `CUMULATE` and `SESSION` window
aggregations.
```sql
-- tables must have time attribute, e.g. `bidtime` in this table
@@ -71,48 +75,74 @@ Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+-------------+
-- tumbling window aggregation
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
++------------------+------------------+-------------+
-- hopping window aggregation
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10'
MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
++------------------+------------------+-------------+
-- cumulative window aggregation
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL
'10' MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
++------------------+------------------+-------------+
+
+-- session window aggregation with partition keys
+Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) AS
total_price
+ FROM TABLE(
+ SESSION(TABLE Bid PARTITION BY supplier_id,
DESCRIPTOR(bidtime), INTERVAL '2' MINUTES))
+ GROUP BY window_start, window_end, supplier_id;
++------------------+------------------+-------------+-------------+
+| window_start | window_end | supplier_id | total_price |
++------------------+------------------+-------------+-------------+
+| 2020-04-15 08:05 | 2020-04-15 08:09 | supplier1 | 6.00 |
+| 2020-04-15 08:09 | 2020-04-15 08:13 | supplier2 | 8.00 |
+| 2020-04-15 08:13 | 2020-04-15 08:15 | supplier1 | 1.00 |
+| 2020-04-15 08:17 | 2020-04-15 08:19 | supplier2 | 6.00 |
++------------------+------------------+-------------+-------------+
+
+-- session window aggregation without partition keys
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
+ FROM TABLE(
+ SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES))
+ GROUP BY window_start, window_end;
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
+| 2020-04-15 08:17 | 2020-04-15 08:19 | 6.00 |
++------------------+------------------+-------------+
```
*Note: in order to better understand the behavior of windowing, we simplify
the displaying of timestamp values to not show the trailing zeros, e.g.
`2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink
SQL Client if the type is `TIMESTAMP(3)`.*
@@ -124,20 +154,20 @@ Window aggregations also support `GROUPING SETS` syntax.
Grouping sets allow for
Window aggregations with `GROUPING SETS` require both the `window_start` and
`window_end` columns have to be in the `GROUP BY` clause, but not in the
`GROUPING SETS` clause.
```sql
-Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price
+Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) AS
total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
-+------------------+------------------+-------------+-------+
-| window_start | window_end | supplier_id | price |
-+------------------+------------------+-------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 |
-+------------------+------------------+-------------+-------+
++------------------+------------------+-------------+-------------+
+| window_start | window_end | supplier_id | total_price |
++------------------+------------------+-------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 |
++------------------+------------------+-------------+-------------+
```
Each sublist of `GROUPING SETS` may specify zero or more columns or
expressions and is interpreted the same way as though used directly in the
`GROUP BY` clause. An empty grouping set means that all rows are aggregated
down to a single group, which is output even if no input rows were present.
@@ -153,7 +183,7 @@ Window aggregations with `ROLLUP` requires both the
`window_start` and `window_e
For example, the following query is equivalent to the one above.
```sql
-SELECT window_start, window_end, supplier_id, SUM(price) as price
+SELECT window_start, window_end, supplier_id, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (supplier_id);
@@ -168,12 +198,12 @@ Window aggregations with `CUBE` requires both the
`window_start` and `window_end
For example, the following two queries are equivalent.
```sql
-SELECT window_start, window_end, item, supplier_id, SUM(price) as price
+SELECT window_start, window_end, item, supplier_id, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, CUBE (supplier_id, item);
-SELECT window_start, window_end, item, supplier_id, SUM(price) as price
+SELECT window_start, window_end, item, supplier_id, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS (
@@ -200,13 +230,13 @@ The following shows a cascading window aggregation where
the first window aggreg
-- tumbling 5 minutes for each supplier_id
CREATE VIEW window1 AS
-- Note: The window start and window end fields of inner Window TVF are
optional in the select clause. However, if they appear in the clause, they need
to be aliased to prevent name conflicting with the window start and window end
of the outer Window TVF.
-SELECT window_start as window_5mintumble_start, window_end as
window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price
+SELECT window_start AS window_5mintumble_start, window_end AS
window_5mintumble_end, window_time AS rowtime, SUM(price) AS partial_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY supplier_id, window_start, window_end, window_time;
-- tumbling 10 minutes on the first window
-SELECT window_start, window_end, SUM(partial_price) as total_price
+SELECT window_start, window_end, SUM(partial_price) AS total_price
FROM TABLE(
TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
diff --git a/docs/content/docs/dev/table/sql/queries/window-join.md
b/docs/content/docs/dev/table/sql/queries/window-join.md
index d0c8d635689..d52364ae9b8 100644
--- a/docs/content/docs/dev/table/sql/queries/window-join.md
+++ b/docs/content/docs/dev/table/sql/queries/window-join.md
@@ -31,6 +31,10 @@ For streaming queries, unlike other joins on continuous
tables, window join does
Usually, Window Join is used with [Windowing TVF]({{< ref
"docs/dev/table/sql/queries/window-tvf" >}}). Besides, Window Join could follow
after other operations based on [Windowing TVF]({{< ref
"docs/dev/table/sql/queries/window-tvf" >}}), such as [Window Aggregation]({{<
ref "docs/dev/table/sql/queries/window-agg" >}}), [Window TopN]({{< ref
"docs/dev/table/sql/queries/window-topn">}}) and [Window Join]({{< ref
"docs/dev/table/sql/queries/window-join">}}).
+{{< hint info >}}
+Note: `SESSION` Window Join is not supported in batch mode now.
+{{< /hint >}}
+
Currently, Window Join requires the join on condition contains window starts
equality of input tables and window ends equality of input tables.
Window Join supports INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN.
diff --git a/docs/content/docs/dev/table/sql/queries/window-topn.md
b/docs/content/docs/dev/table/sql/queries/window-topn.md
index cf2341e8be9..8922f5ce6d7 100644
--- a/docs/content/docs/dev/table/sql/queries/window-topn.md
+++ b/docs/content/docs/dev/table/sql/queries/window-topn.md
@@ -30,6 +30,10 @@ Window Top-N is a special [Top-N]({{< ref
"docs/dev/table/sql/queries/topn" >}})
For streaming queries, unlike regular Top-N on continuous tables, window Top-N
does not emit intermediate results but only a final result, the total top N
records at the end of the window. Moreover, window Top-N purges all
intermediate state when no longer needed.
Therefore, window Top-N queries have better performance if users don't need
results updated per record. Usually, Window Top-N is used with [Windowing
TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) directly. Besides,
Window Top-N could be used with other operations based on [Windowing TVF]({{<
ref "docs/dev/table/sql/queries/window-tvf" >}}), such as [Window
Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}), [Window
TopN]({{< ref "docs/dev/table/sql/queries/wind [...]
+{{< hint info >}}
+Note: `SESSION` Window Top-N is not supported in batch mode now.
+{{< /hint >}}
+
Window Top-N can be defined in the same syntax as regular Top-N, see [Top-N
documentation]({{< ref "docs/dev/table/sql/queries/topn" >}}) for more
information.
Besides that, Window Top-N requires the `PARTITION BY` clause contains
`window_start` and `window_end` columns of the relation applied [Windowing
TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) or [Window
Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}).
Otherwise, the optimizer won’t be able to translate the query.
diff --git a/docs/content/docs/dev/table/sql/queries/window-tvf.md
b/docs/content/docs/dev/table/sql/queries/window-tvf.md
index 712c27d2227..905038e46d5 100644
--- a/docs/content/docs/dev/table/sql/queries/window-tvf.md
+++ b/docs/content/docs/dev/table/sql/queries/window-tvf.md
@@ -33,7 +33,7 @@ Apache Flink provides several window table-valued functions
(TVF) to divide the
- [Tumble Windows](#tumble)
- [Hop Windows](#hop)
- [Cumulate Windows](#cumulate)
-- Session Windows (will be supported soon)
+- [Session Windows](#session) (Only supported in streaming mode now)
Note that each element can logically belong to more than one window, depending
on the windowing table-valued function you use. For example, HOP windowing
creates overlapping windows wherein a single element can be assigned to
multiple windows.
@@ -49,7 +49,7 @@ See more how to apply further computations based on windowing
TVF:
## Window Functions
-Apache Flink provides 3 built-in windowing TVFs: `TUMBLE`, `HOP` and
`CUMULATE`. The return value of windowing TVF is a new relation that includes
all columns of original relation as well as additional 3 columns named
"window_start", "window_end", "window_time" to indicate the assigned window.
+Apache Flink provides 4 built-in windowing TVFs: `TUMBLE`, `HOP`, `CUMULATE`
and `SESSION`. The return value of windowing TVF is a new relation that
includes all columns of original relation as well as additional 3 columns named
"window_start", "window_end", "window_time" to indicate the assigned window.
In streaming mode, the "window_time" field is a [time attributes]({{< ref
"docs/dev/table/concepts/time_attributes" >}}) of the window.
In batch mode, the "window_time" field is an attribute of type `TIMESTAMP` or
`TIMESTAMP_LTZ` based on input time field type.
The "window_time" field can be used in subsequent time-based operations, e.g.
another windowing TVF, or <a href="{{< ref "docs/dev/table/sql/queries/joins"
>}}#interval-joins">interval joins</a>, <a href="{{< ref
"docs/dev/table/sql/queries/over-agg" >}}">over aggregations</a>. The value of
`window_time` always equal to `window_end - 1ms`.
@@ -122,16 +122,16 @@ Flink SQL> SELECT * FROM TABLE(
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the tumbling windowed table
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
++------------------+------------------+-------------+
```
*Note: in order to better understand the behavior of windowing, we simplify
the displaying of timestamp values to not show the trailing zeros, e.g.
`2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink
SQL Client if the type is `TIMESTAMP(3)`.*
@@ -193,18 +193,18 @@ Here is an example invocation on the `Bid` table:
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the hopping windowed table
-> SELECT window_start, window_end, SUM(price)
+> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10'
MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
++------------------+------------------+-------------+
```
### CUMULATE
@@ -271,22 +271,149 @@ Here is an example invocation on the Bid table:
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the cumulating windowed table
-> SELECT window_start, window_end, SUM(price)
+> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL
'10' MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
-| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
-| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
+| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
+| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
++------------------+------------------+-------------+
+```
+
+### SESSION
+
+{{< hint info >}}
+Note:
+1. Session Window TVF is not supported in batch mode now.
+2. Session Window Aggregation does not support any optimization in
[Performance Tuning]({{< ref "docs/dev/table/tuning" >}}) now.
+3. Session Window Join, Session Window TopN and Session Window Deduplication
are conceptually supported and in beta mode. Issues can be reported in
[JIRA](https://issues.apache.org/jira/browse/FLINK).
+{{< /hint >}}
+
+The `SESSION` function groups elements by sessions of activity. In contrast to
`TUMBLE` windows and `HOP` windows, session windows do not overlap and do not
have a fixed start and end time.
+Instead, a `session` window closes when it doesn't receive elements for a
certain period of time, i.e., when a gap of inactivity occurred.
+A `session` window should be configured with a static session gap which
defines how long the period of inactivity is.
+When this period expires, the current `session` closes and subsequent elements
are assigned to a new `session` window.
+
+For example, you could have windows of gap 10 minutes.
+With this, when the interval between two events of the same user is less than
10 minutes, these events will be grouped into the same `session` window.
+If there is no data after 10 minutes following the latest event, then this
`session` window will close and be sent downstream.
+Subsequent events will be assigned to a new `session` window.
+
+{{< img src="/fig/session-windows.svg" alt="Session windows" width="70%">}}
+
+The `SESSION` function assigns windows that cover rows based on datetime.
+In streaming mode, the time attribute field must be either [event or
processing time attributes]({{< ref "docs/dev/table/concepts/time_attributes"
>}}).
+The return value of `SESSION` is a new relation that includes all columns of
original relation as well as additional 3 columns named "window_start",
"window_end", "window_time" to indicate the assigned window.
+The original time attribute "timecol" will be a regular timestamp column after
windowing TVF.
+
+`SESSION` takes three required parameters and one optional parameter:
+
+```sql
+SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
+```
+
+- `data`: is a table parameter that can be any relation with an time attribute
column.
+- `keycols`: is a column descriptor indicating which columns should be used to
partition the data prior to session windows.
+- `timecol`: is a column descriptor indicating which time attributes column of
data should be mapped to session windows.
+- `gap`: is the maximum interval in timestamp for two events to be considered
part of the same session window.
+
+Here is an example invocation on the `Bid` table:
+
+```sql
+-- tables must have time attribute, e.g. `bidtime` in this table
+Flink SQL> desc Bid;
++-------------+------------------------+------+-----+--------+---------------------------------+
+| name | type | null | key | extras |
watermark |
++-------------+------------------------+------+-----+--------+---------------------------------+
+| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` -
INTERVAL '1' SECOND |
+| price | DECIMAL(10, 2) | true | | |
|
+| item | STRING | true | | |
|
++-------------+------------------------+------+-----+--------+---------------------------------+
+
+Flink SQL> SELECT * FROM Bid;
++------------------+-------+------+
+| bidtime | price | item |
++------------------+-------+------+
+| 2020-04-15 08:07 | 4.00 | A |
+| 2020-04-15 08:06 | 2.00 | A |
+| 2020-04-15 08:09 | 5.00 | B |
+| 2020-04-15 08:08 | 3.00 | A |
+| 2020-04-15 08:17 | 1.00 | B |
++------------------+-------+------+
+
+-- session window with partition keys
+> SELECT * FROM TABLE(
+ SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5'
MINUTES));
+-- or with the named params
+-- note: the DATA param must be the first
+> SELECT * FROM TABLE(
+ SESSION(
+ DATA => TABLE Bid PARTITION BY item,
+ TIMECOL => DESCRIPTOR(bidtime),
+ GAP => INTERVAL '5' MINUTES);
++------------------+-------+------+------------------+------------------+-------------------------+
+| bidtime | price | item | window_start | window_end |
window_time |
++------------------+-------+------+------------------+------------------+-------------------------+
+| 2020-04-15 08:07 | 4.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 |
2020-04-15 08:12:59.999 |
+| 2020-04-15 08:06 | 2.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 |
2020-04-15 08:12:59.999 |
+| 2020-04-15 08:08 | 3.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 |
2020-04-15 08:12:59.999 |
+| 2020-04-15 08:09 | 5.00 | B | 2020-04-15 08:09 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:17 | 1.00 | B | 2020-04-15 08:17 | 2020-04-15 08:22 |
2020-04-15 08:21:59.999 |
++------------------+-------+------+------------------+------------------+-------------------------+
+
+-- apply aggregation on the session windowed table with partition keys
+> SELECT window_start, window_end, item, SUM(price) AS total_price
+ FROM TABLE(
+ SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5'
MINUTES))
+ GROUP BY item, window_start, window_end;
++------------------+------------------+------+-------------+
+| window_start | window_end | item | total_price |
++------------------+------------------+------+-------------+
+| 2020-04-15 08:06 | 2020-04-15 08:13 | A | 9.00 |
+| 2020-04-15 08:09 | 2020-04-15 08:14 | B | 5.00 |
+| 2020-04-15 08:17 | 2020-04-15 08:22 | B | 1.00 |
++------------------+------------------+------+-------------+
+
+-- session window without partition keys
+> SELECT * FROM TABLE(
+ SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));
+-- or with the named params
+-- note: the DATA param must be the first
+> SELECT * FROM TABLE(
+ SESSION(
+ DATA => TABLE Bid,
+ TIMECOL => DESCRIPTOR(bidtime),
+ GAP => INTERVAL '5' MINUTES);
++------------------+-------+------+------------------+------------------+-------------------------+
+| bidtime | price | item | window_start | window_end |
window_time |
++------------------+-------+------+------------------+------------------+-------------------------+
+| 2020-04-15 08:07 | 4.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:06 | 2.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:08 | 3.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:09 | 5.00 | B | 2020-04-15 08:06 | 2020-04-15 08:14 |
2020-04-15 08:13:59.999 |
+| 2020-04-15 08:17 | 1.00 | B | 2020-04-15 08:17 | 2020-04-15 08:22 |
2020-04-15 08:21:59.999 |
++------------------+-------+------+------------------+------------------+-------------------------+
+
+-- apply aggregation on the session windowed table without partition keys
+> SELECT window_start, window_end, SUM(price) AS total_price
+ FROM TABLE(
+ SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
+ GROUP BY window_start, window_end;
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:06 | 2020-04-15 08:14 | 14.00 |
+| 2020-04-15 08:17 | 2020-04-15 08:22 | 1.00 |
++------------------+------------------+-------------+
```
## Window Offset
@@ -331,16 +458,16 @@ Flink SQL> SELECT * FROM TABLE(
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the tumbling windowed table
-Flink SQL> SELECT window_start, window_end, SUM(price)
+Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1'
MINUTES))
GROUP BY window_start, window_end;
-+------------------+------------------+-------+
-| window_start | window_end | price |
-+------------------+------------------+-------+
-| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
-| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
-+------------------+------------------+-------+
++------------------+------------------+-------------+
+| window_start | window_end | total_price |
++------------------+------------------+-------------+
+| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
+| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
++------------------+------------------+-------------+
```
*Note: in order to better understand the behavior of windowing, we simplify
the displaying of timestamp values to not show the trailing zeros, e.g.
`2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink
SQL Client if the type is `TIMESTAMP(3)`.*
diff --git a/docs/content/docs/dev/table/tuning.md
b/docs/content/docs/dev/table/tuning.md
index 2b97e6afb62..b78c4687c36 100644
--- a/docs/content/docs/dev/table/tuning.md
+++ b/docs/content/docs/dev/table/tuning.md
@@ -31,7 +31,7 @@ SQL is the most widely used language for data analytics.
Flink's Table API and S
In this page, we will introduce some useful optimization options and the
internals of streaming aggregation which will bring great improvement in some
cases.
{{< hint info >}}
-The streaming aggregation optimizations mentioned in this page are all
supported for [Group Aggregations]({{< ref
"docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{<
ref "docs/dev/table/sql/queries/window-agg" >}}) now.
+The streaming aggregation optimizations mentioned in this page are all
supported for [Group Aggregations]({{< ref
"docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{<
ref "docs/dev/table/sql/queries/window-agg" >}}) (except Session Window TVF
Aggregation) now.
{{< /hint >}}