This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository
https://gitbox.apache.org/repos/asf/incubator-streampark-website.git
The following commit(s) were added to refs/heads/dev by this push:
new 42b6f3f0 add with declare and some example (#185)
42b6f3f0 is described below
commit 42b6f3f0090c792eeafbb532805a206b25db6c0f
Author: 第一片心意 <[email protected]>
AuthorDate: Sat Mar 4 21:09:28 2023 +0800
add with declare and some example (#185)
---
docs/flinksql/5-example.md | 567 ++++++++++++++++++++-
docs/flinksql/syntax/7-select.md | 20 +-
.../current/flinksql/5-example.md | 563 +++++++++++++++++++-
.../current/flinksql/syntax/7-select.md | 20 +-
.../flinksql/example/jdbc-lookup-join-running.png | Bin 0 -> 120642 bytes
.../image/flinksql/example/jdbc-lookup-join.png | Bin 0 -> 118966 bytes
.../image/flinksql/example/jdbc-regular-join.png | Bin 0 -> 100505 bytes
.../example/multy-stream-union-window-result.png | Bin 0 -> 64360 bytes
.../flinksql/example/multy-stream-union-window.png | Bin 0 -> 117235 bytes
.../example/multy-stream-window-union-result.png | Bin 0 -> 130576 bytes
.../example/multy-stream-window-union-ui1.png | Bin 0 -> 155062 bytes
.../example/multy-stream-window-union-ui2.png | Bin 0 -> 154200 bytes
.../image/flinksql/example/one-stream-window.png | Bin 0 -> 503633 bytes
.../image/flinksql/example/one-stream-window2.png | Bin 0 -> 57828 bytes
14 files changed, 1142 insertions(+), 28 deletions(-)
diff --git a/docs/flinksql/5-example.md b/docs/flinksql/5-example.md
index 8e90e1e5..b2731d1b 100644
--- a/docs/flinksql/5-example.md
+++ b/docs/flinksql/5-example.md
@@ -1,6 +1,6 @@
---
id: '5-example'
-title: 'Use Cases'
+title: '使用案例'
sidebar_position: 5
---
@@ -181,8 +181,7 @@ set 'table.dynamic-table-options.enabled' = 'true';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -272,8 +271,7 @@ set 'table.dynamic-table-options.enabled' = 'true';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -400,8 +398,7 @@ set 'table.local-time-zone' = 'GMT+08:00';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -427,8 +424,7 @@ set 'table.local-time-zone' = 'GMT+08:00';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -786,8 +782,7 @@ set 'table.local-time-zone' = 'GMT+08:00';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -810,8 +805,7 @@ set 'table.local-time-zone' = 'GMT+08:00';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -828,6 +822,551 @@ lateral view explode(col2) a as col
;
```
-## 写在最后
+# kafka join JDBC
+
+## 常规 join
+
+```sql
+create table source (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价'
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods'
+)
+;
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价'
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+insert into sink
+select id, a.goods_id, b.goods_name, a.goods_count, a.price_total
+from source as a
+join dim_goods as b
+on a.goods_id = b.goods_id
+;
+```
+
+直接使用常规 join ,发现任务运行之后,JDBC 对应的 source,会直接运行一次,加载完 mysql 表中的所有数据,然后 task 完成。
+
+Flink UI 界面:
+
+
+
+可以看到,MySql 表中的所有数据只会被完全加载一次,然后缓存到 Flink 内存中,之后对 MySql 表中的数据的更改,并不会影响运行中 flink
任务的结果。
+
+通过不断往 kafka 中发送数据,可以证明这一点:
+
+* 发送维表中有对应 id 的数据,维表中找到多条,则会产生多条结果。
+* 发送维表中没有对应 id 的数据,则不会发送结果,和 join 的预期结果一致。
+
+## lookup join
+
+```sql
+set pipeline.operator-chaining = false;
+
+create table source (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods'
+)
+;
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价'
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+insert into sink
+select id, a.goods_id, b.goods_name, a.goods_count, a.price_total
+from source as a
+join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+on a.goods_id = b.goods_id
+;
+```
+
+flink 任务运行的 Flink UI 界面:
+
+
+
+可以看到,在没有接收到 kafka 数据时,并没有去 mysql 维表中加载维表数据,而且任务执行图中也显示出现在执行的是 LookupJoin。
+
+往 kafka 中发送数据,会出现以下情况:
+
+* kafka 中的数据可以在在维表中找到对应 id 的数据,也就是可以关联上,则会进行一次处理,而且所有执行节点处理的数据量均为 1.
+* kafka 中的数据在维表中找不到对应 id 的数据,也就是说关联不上,则只有 soruce 执行节点接收到了数据,后面的 LookupJoin
执行节点没有接收到数据,也就是说没有找到对应 id 的数据,后续也不进行处理,如下图:
+ 
+
+**注**:lookup join 方式并不会缓存维表中的数据。
+
+不够 JDBC 给 lookup join 提供了 lookup cache 功能,可以通过下面这两个 JCBC 参数开启:
+
+```sql
+'lookup.cache.max-rows' = '10000',
+'lookup.cache.ttl' = '1 min'
+```
+
+上面参数二选一即可,也可同时设置,缓存的数据满足其中一个条件,就会过期,然后被清除。
+
+**注**:如果在关联维表时还有一些对维表数据的过滤,则可以直接将条件写到 on 条件之后,使用 `and`
关键字连接即可。不推荐创建为表的视图,在视图里面提前对数据进行过滤,这会涉及到 `primary key` 相关的问题。示例如下:
+
+```sql
+from source as a
+join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+on a.goods_id = b.goods_id and dt in (2)
+```
+
+其中,`dt` 字段是维表中的字段。
+
+# kafka 开窗
+
+## 单流开窗统计
+
+由于该案例涉及到了维表关联,所以先创建了一个视图,用来完成维表关联,之后在视图的基础上进行开窗累加统计。
+
+```sql
+-- set pipeline.operator-chaining = false;
+
+create table source (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ dt integer comment '分区'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods',
+ 'lookup.cache.max-rows' = '10000',
+ 'lookup.cache.ttl' = '1 min'
+)
+;
+
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ window_start timestamp(3) comment '窗口开始时间',
+ window_end timestamp(3) comment '窗口结束时间',
+ primary key(id, goods_id, goods_name, window_start, window_end) not
enforced
+) with (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'key.format' = 'csv',
+ 'value.format' = 'csv',
+ 'value.csv.field-delimiter' = ' '
+)
+;
+
+create view middle as
+select id, a.goods_id, b.goods_name, a.goods_count, a.price_total, a.proctime
+from source as a
+join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+on a.goods_id = b.goods_id and dt in (1)
+;
+
+insert into sink
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, window_start, window_end
+from
+ table(
+ cumulate(table middle, descriptor(proctime), interval '1' minutes,
interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+```
+
+**注**:该案例使用的渐进式窗口,在 `group by`
进行累加时,必须将窗口开始时间和结束时间字段都添加上,否则渐进式窗口会在每次接收到实时数据后做结果输出,而且会输出后续所有窗口的结果。如果最终结果不需要窗口时间字段,可以在外面再包一层,只挑选自己需要的字段。
+
+如果聚合条件只写了 window_end ,而没有写 window_start,则结果输出为
+
+
+
+可以看到,后续所有涉及到的窗口结果,都被输出了,而且是每接收到 kafka 一条数据,就会触发计算并输出结果。
+
+如果聚合条件中把窗口开始时间和结束时间都写上的话,则会输出理想的结果,如下图所示
+
+
+
+每次到达窗口结束时间时,不管上游 kafka 是否有新数据,会触发对应窗口计算,并且输出对应窗口的结果。
+
+## 多流合并开窗统计
+
+### 将多源合并之后开窗
+
+这种方式的结果是正确的。
+
+```sql
+-- set pipeline.operator-chaining = false;
+
+create table source1 (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+create table source2 (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source2',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ dt integer comment '分区'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods',
+ 'lookup.cache.max-rows' = '10000',
+ 'lookup.cache.ttl' = '1 min'
+)
+;
+
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ window_start timestamp(3) comment '窗口开始时间',
+ window_end timestamp(3) comment '窗口结束时间',
+ primary key(id, goods_id, goods_name, window_start, window_end) not
enforced
+) with (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'key.format' = 'csv',
+ 'value.format' = 'csv',
+ 'value.csv.field-delimiter' = ' '
+)
+;
+
+create view middle as
+ select id, a.goods_id, b.goods_name, a.goods_count, a.price_total,
a.proctime
+ from source1 as a
+ join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+ on a.goods_id = b.goods_id and dt in (1)
+union all
+ select id, a.goods_id, b.goods_name, a.goods_count, a.price_total,
a.proctime
+ from source2 as a
+ join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+ on a.goods_id = b.goods_id and dt in (1)
+;
+
+
+insert into sink
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, window_start, window_end
+from
+ table(
+ cumulate(table middle, descriptor(proctime), interval '1' minutes,
interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+```
+
+上面的 sql 中,首先将两个 source 流中的数据进行各自维表关联打宽,然后合并到一起,合并必须使用 union all,否则 porctime
时间属性特性会丢失,下面的开窗会无法使用。
+
+之后对合并之后的视图进行开窗统计,经过测试,发现是理想的结果。上面 flink sql 任务对应 flink UI 界面为:
+
+
+
+不管上游 kafka 是否会继续发送数据,每次到达小窗口触发时间,都会输出正确的计算结果,结果如下:
+
+从结果中可以看到,每次小窗口触发计算之后,都会输出对应窗口的结果,而且是正确的结果。
+
+### 分别开窗之后再合并开窗-错误结果
+
+这种方式的结果是错误的。
+
+```sql
+-- set pipeline.operator-chaining = false;
+
+create table source1 (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+create table source2 (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source2',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ dt integer comment '分区'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods',
+ 'lookup.cache.max-rows' = '10000',
+ 'lookup.cache.ttl' = '1 min'
+)
+;
+
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ window_start timestamp(3) comment '窗口开始时间',
+ window_end timestamp(3) comment '窗口结束时间',
+ primary key(id, goods_id, goods_name, window_start, window_end) not
enforced
+) with (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'key.format' = 'csv',
+ 'value.format' = 'csv',
+ 'value.csv.field-delimiter' = ' '
+)
+;
+
+create view middle1 as
+ select id, a.goods_id, b.goods_name, a.goods_count, a.price_total,
a.proctime
+ from source1 as a
+ join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+ on a.goods_id = b.goods_id
+;
+create view middle2 as
+ select id, a.goods_id, b.goods_name, a.goods_count, a.price_total,
a.proctime
+ from source2 as a
+ join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+ on a.goods_id = b.goods_id
+;
+
+create view result1 as
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, max(proctime) as proctime, window_start,
window_end
+from
+ table(
+ cumulate(table middle1, descriptor(proctime), interval '1' minutes,
interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+create view result2 as
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, max(proctime) as proctime, window_start,
window_end
+from
+ table(
+ cumulate(table middle2, descriptor(proctime), interval '1' minutes,
interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+-- 需要重新注册处理时间,上面的处理时间属性字段已经不可用了
+create view result_union as
+select id, goods_id, goods_name, goods_count, price_total, proctime() as
proctime
+from result1
+union all
+select id, goods_id, goods_name, goods_count, price_total, proctime() as
proctime
+from result2
+;
+
+insert into sink
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, window_start, window_end
+from
+ table(
+ cumulate(table result_union, descriptor(proctime), interval '1'
minutes, interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+```
+
+由于第一次开窗使用了源中的 proctime 这个处理时间属性字段,所以下面再次开窗时,这个字段的时间属性已经丢失了,所以在 union all
两个源开窗合并的结果时,需要重新注册处理时间属性字段,之后使用该字段进行二次开窗统计。但是由于第一次开窗之后的结果对应的处理时间已经超过了对应的窗口结束时间,因此新注册的处理时间已经超过了上一个窗口的结束时间,下面再次开窗统计时,数据将会再下一次窗口内统计,所以最终的结果时间,已经是下一个窗口的时间了,时间明显滞后一个窗口时间。
+
+另一个错误是:我上面使用的是渐进式窗口,因此第一个窗口会在每个小窗口结束时发送最新计算结果,而且不管上游的 kafka
有没有新数据,都会发送结果。如此一次,第二次的开窗,会不断的接收到第一次开窗的结果数据,所以第二次开窗中,除了第一个窗口,后面的窗口计算结果都错了,他们一直在累加。
+
+我只往两个 kafka 源中发送了一次数据,之后再也没发送过数据,但是每次小窗口被触发之后,都会进行累加,具体结果如下:
+
+
+
+Flink UI 界面两次窗口时间处理数据量如下:
+
+
+
+
+
+从两张图中可以看出,在两次窗口触发时间中,第一次开窗对应的两个计算节点的输入数据都是 3,没有变化,但是输出数据量都从 1 变成
2,而且最后那个计算节点,也就是第二次开窗,接收的数据从 2 变成了
4,因此最终的结果输出,第二次的结果就是第一次结果的二倍。**这一点,大家在具体使用中一定要注意,不可以将多个数据源第一次开窗的结果合并之后再次进行开窗**。我上面使用的是渐进式窗口,滚动窗口理论上应该不会出现重复累加的问题,但是最终的结果在窗口时间上应该会滞后一个窗口时间。
+
+# WITH 子句
+
+`with` 子句只能在一个 `select` 语句上面使用,不可单独编写,并且添加英文分号,将其作为一个单独的公共表达式,然后在多个 `select`
语句中使用。如果想实现在多个 `select` 语句中使用同一个公共表达式,可以通过创建临时视图来解决。
+
+示例:
+
+```sql
+create temporary table source(
+ s1 string,
+ s2 string
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'wzq_source',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'properties.group.id' = 'test-1',
+ 'scan.startup.mode' = 'latest-offset',
+ 'scan.topic-partition-discovery.interval' = '1 h',
+ 'format' = 'csv'
+);
+
+create temporary table sink(
+ s1 string
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'wzq_sink',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'format' = 'csv'
+);
+
+insert into sink
+with with1 as (
+select concat(s1, '|', s2) as w1
+from source
+),
+with2 as (
+select concat(w1, '-', 'w2') as w2
+from with1
+)
+select w2
+from with2
+;
+```
+
+
+
+## 写在最后
若大家有优秀的使用案例的话,也可向社区反馈,以丰富更多的案例。
diff --git a/docs/flinksql/syntax/7-select.md b/docs/flinksql/syntax/7-select.md
index eeca68a9..5b058171 100644
--- a/docs/flinksql/syntax/7-select.md
+++ b/docs/flinksql/syntax/7-select.md
@@ -103,7 +103,7 @@ SELECT ... FROM ...;
with_item_name (column_name[, ...n]) AS ( <select_query> )
```
-使用案例:
+使用案例1:
```sql
WITH orders_with_total AS (
@@ -115,8 +115,26 @@ FROM orders_with_total
GROUP BY order_id;
```
+使用案例2:
+
+```sql
+WITH orders_with_total1 AS (
+ SELECT order_id, price + tax AS total
+ FROM Orders
+),
+orders_with_total1 AS (
+ SELECT concateorder_id, price + tax AS total
+ FROM Orders
+)
+SELECT order_id, SUM(total)
+FROM orders_with_total
+GROUP BY order_id;
+```
+
上面的`with`子句定义了`orders_with_total`,并且在`group by`子句中使用了它。
+> `with` 子句只能在一个 `select` 语句上面使用,不可单独编写,并且添加英文分号,将其作为一个单独的公共表达式,然后在多个 `select`
语句中使用。如果想实现在多个 `select` 语句中使用同一个公共表达式,可以通过创建临时视图来解决。
+
## SELECT和WHERE
在流批模式任务中均可使用。
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/flinksql/5-example.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/flinksql/5-example.md
index 7071f521..3a4b69bb 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/flinksql/5-example.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/flinksql/5-example.md
@@ -181,8 +181,7 @@ set 'table.dynamic-table-options.enabled' = 'true';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -272,8 +271,7 @@ set 'table.dynamic-table-options.enabled' = 'true';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -400,8 +398,7 @@ set 'table.local-time-zone' = 'GMT+08:00';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -427,8 +424,7 @@ set 'table.local-time-zone' = 'GMT+08:00';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -786,8 +782,7 @@ set 'table.local-time-zone' = 'GMT+08:00';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -810,8 +805,7 @@ set 'table.local-time-zone' = 'GMT+08:00';
-- 创建catalog
create catalog hive with (
'type' = 'hive',
- 'hadoop-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources',
- 'hive-conf-dir' =
'D:\IDEAWorkspace\work\baishan\log\data-max\src\main\resources'
+ 'hadoop-conf-dir' = 'hdfs:///hadoop-conf'
)
;
@@ -828,6 +822,551 @@ lateral view explode(col2) a as col
;
```
+# kafka join JDBC
+
+## 常规 join
+
+```sql
+create table source (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价'
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods'
+)
+;
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价'
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+insert into sink
+select id, a.goods_id, b.goods_name, a.goods_count, a.price_total
+from source as a
+join dim_goods as b
+on a.goods_id = b.goods_id
+;
+```
+
+直接使用常规 join ,发现任务运行之后,JDBC 对应的 source,会直接运行一次,加载完 mysql 表中的所有数据,然后 task 完成。
+
+Flink UI 界面:
+
+
+
+可以看到,MySql 表中的所有数据只会被完全加载一次,然后缓存到 Flink 内存中,之后对 MySql 表中的数据的更改,并不会影响运行中 flink
任务的结果。
+
+通过不断往 kafka 中发送数据,可以证明这一点:
+
+* 发送维表中有对应 id 的数据,维表中找到多条,则会产生多条结果。
+* 发送维表中没有对应 id 的数据,则不会发送结果,和 join 的预期结果一致。
+
+## lookup join
+
+```sql
+set pipeline.operator-chaining = false;
+
+create table source (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods'
+)
+;
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价'
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+insert into sink
+select id, a.goods_id, b.goods_name, a.goods_count, a.price_total
+from source as a
+join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+on a.goods_id = b.goods_id
+;
+```
+
+flink 任务运行的 Flink UI 界面:
+
+
+
+可以看到,在没有接收到 kafka 数据时,并没有去 mysql 维表中加载维表数据,而且任务执行图中也显示出现在执行的是 LookupJoin。
+
+往 kafka 中发送数据,会出现以下情况:
+
+* kafka 中的数据可以在在维表中找到对应 id 的数据,也就是可以关联上,则会进行一次处理,而且所有执行节点处理的数据量均为 1.
+* kafka 中的数据在维表中找不到对应 id 的数据,也就是说关联不上,则只有 soruce 执行节点接收到了数据,后面的 LookupJoin
执行节点没有接收到数据,也就是说没有找到对应 id 的数据,后续也不进行处理,如下图:
+ 
+
+**注**:lookup join 方式并不会缓存维表中的数据。
+
+不够 JDBC 给 lookup join 提供了 lookup cache 功能,可以通过下面这两个 JCBC 参数开启:
+
+```sql
+'lookup.cache.max-rows' = '10000',
+'lookup.cache.ttl' = '1 min'
+```
+
+上面参数二选一即可,也可同时设置,缓存的数据满足其中一个条件,就会过期,然后被清除。
+
+**注**:如果在关联维表时还有一些对维表数据的过滤,则可以直接将条件写到 on 条件之后,使用 `and`
关键字连接即可。不推荐创建为表的视图,在视图里面提前对数据进行过滤,这会涉及到 `primary key` 相关的问题。示例如下:
+
+```sql
+from source as a
+join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+on a.goods_id = b.goods_id and dt in (2)
+```
+
+其中,`dt` 字段是维表中的字段。
+
+# kafka 开窗
+
+## 单流开窗统计
+
+由于该案例涉及到了维表关联,所以先创建了一个视图,用来完成维表关联,之后在视图的基础上进行开窗累加统计。
+
+```sql
+-- set pipeline.operator-chaining = false;
+
+create table source (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ dt integer comment '分区'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods',
+ 'lookup.cache.max-rows' = '10000',
+ 'lookup.cache.ttl' = '1 min'
+)
+;
+
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ window_start timestamp(3) comment '窗口开始时间',
+ window_end timestamp(3) comment '窗口结束时间',
+ primary key(id, goods_id, goods_name, window_start, window_end) not
enforced
+) with (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'key.format' = 'csv',
+ 'value.format' = 'csv',
+ 'value.csv.field-delimiter' = ' '
+)
+;
+
+create view middle as
+select id, a.goods_id, b.goods_name, a.goods_count, a.price_total, a.proctime
+from source as a
+join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+on a.goods_id = b.goods_id and dt in (1)
+;
+
+insert into sink
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, window_start, window_end
+from
+ table(
+ cumulate(table middle, descriptor(proctime), interval '1' minutes,
interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+```
+
+**注**:该案例使用的渐进式窗口,在 `group by`
进行累加时,必须将窗口开始时间和结束时间字段都添加上,否则渐进式窗口会在每次接收到实时数据后做结果输出,而且会输出后续所有窗口的结果。如果最终结果不需要窗口时间字段,可以在外面再包一层,只挑选自己需要的字段。
+
+如果聚合条件只写了 window_end ,而没有写 window_start,则结果输出为
+
+
+
+可以看到,后续所有涉及到的窗口结果,都被输出了,而且是每接收到 kafka 一条数据,就会触发计算并输出结果。
+
+如果聚合条件中把窗口开始时间和结束时间都写上的话,则会输出理想的结果,如下图所示
+
+
+
+每次到达窗口结束时间时,不管上游 kafka 是否有新数据,会触发对应窗口计算,并且输出对应窗口的结果。
+
+## 多流合并开窗统计
+
+### 将多源合并之后开窗
+
+这种方式的结果是正确的。
+
+```sql
+-- set pipeline.operator-chaining = false;
+
+create table source1 (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+create table source2 (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source2',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ dt integer comment '分区'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods',
+ 'lookup.cache.max-rows' = '10000',
+ 'lookup.cache.ttl' = '1 min'
+)
+;
+
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ window_start timestamp(3) comment '窗口开始时间',
+ window_end timestamp(3) comment '窗口结束时间',
+ primary key(id, goods_id, goods_name, window_start, window_end) not
enforced
+) with (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'key.format' = 'csv',
+ 'value.format' = 'csv',
+ 'value.csv.field-delimiter' = ' '
+)
+;
+
+create view middle as
+ select id, a.goods_id, b.goods_name, a.goods_count, a.price_total,
a.proctime
+ from source1 as a
+ join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+ on a.goods_id = b.goods_id and dt in (1)
+union all
+ select id, a.goods_id, b.goods_name, a.goods_count, a.price_total,
a.proctime
+ from source2 as a
+ join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+ on a.goods_id = b.goods_id and dt in (1)
+;
+
+
+insert into sink
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, window_start, window_end
+from
+ table(
+ cumulate(table middle, descriptor(proctime), interval '1' minutes,
interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+```
+
+上面的 sql 中,首先将两个 source 流中的数据进行各自维表关联打宽,然后合并到一起,合并必须使用 union all,否则 porctime
时间属性特性会丢失,下面的开窗会无法使用。
+
+之后对合并之后的视图进行开窗统计,经过测试,发现是理想的结果。上面 flink sql 任务对应 flink UI 界面为:
+
+
+
+不管上游 kafka 是否会继续发送数据,每次到达小窗口触发时间,都会输出正确的计算结果,结果如下:
+
+从结果中可以看到,每次小窗口触发计算之后,都会输出对应窗口的结果,而且是正确的结果。
+
+### 分别开窗之后再合并开窗-错误结果
+
+这种方式的结果是错误的。
+
+```sql
+-- set pipeline.operator-chaining = false;
+
+create table source1 (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+create table source2 (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ proctime as proctime()
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'source2',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'scan.startup.mode' = 'latest-offset',
+ 'properties.group.id' = 'test',
+ 'format' = 'csv',
+ 'csv.field-delimiter' = ' '
+)
+;
+
+create table dim_goods (
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ dt integer comment '分区'
+) with (
+ 'connector' = 'jdbc',
+ 'url' = 'jdbc:mysql://hadoop01:3306/test',
+ 'username' = 'test',
+ 'password' = 'test',
+ 'table-name' = 'dim_goods',
+ 'lookup.cache.max-rows' = '10000',
+ 'lookup.cache.ttl' = '1 min'
+)
+;
+
+
+create table sink (
+ id integer comment '订单id',
+ goods_id integer comment '商品id',
+ goods_name string comment '商品名称',
+ goods_count integer comment '购买商品数量',
+ price_total double comment '总价',
+ window_start timestamp(3) comment '窗口开始时间',
+ window_end timestamp(3) comment '窗口结束时间',
+ primary key(id, goods_id, goods_name, window_start, window_end) not
enforced
+) with (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'sink1',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'key.format' = 'csv',
+ 'value.format' = 'csv',
+ 'value.csv.field-delimiter' = ' '
+)
+;
+
+create view middle1 as
+ select id, a.goods_id, b.goods_name, a.goods_count, a.price_total,
a.proctime
+ from source1 as a
+ join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+ on a.goods_id = b.goods_id
+;
+create view middle2 as
+ select id, a.goods_id, b.goods_name, a.goods_count, a.price_total,
a.proctime
+ from source2 as a
+ join dim_goods FOR SYSTEM_TIME AS OF a.proctime as b
+ on a.goods_id = b.goods_id
+;
+
+create view result1 as
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, max(proctime) as proctime, window_start,
window_end
+from
+ table(
+ cumulate(table middle1, descriptor(proctime), interval '1' minutes,
interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+create view result2 as
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, max(proctime) as proctime, window_start,
window_end
+from
+ table(
+ cumulate(table middle2, descriptor(proctime), interval '1' minutes,
interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+-- 需要重新注册处理时间,上面的处理时间属性字段已经不可用了
+create view result_union as
+select id, goods_id, goods_name, goods_count, price_total, proctime() as
proctime
+from result1
+union all
+select id, goods_id, goods_name, goods_count, price_total, proctime() as
proctime
+from result2
+;
+
+insert into sink
+select id, goods_id, goods_name, sum(goods_count) as goods_count,
sum(price_total) as price_total, window_start, window_end
+from
+ table(
+ cumulate(table result_union, descriptor(proctime), interval '1'
minutes, interval '1' day)
+ )
+group by id, goods_id, goods_name, window_start, window_end
+;
+```
+
+由于第一次开窗使用了源中的 proctime 这个处理时间属性字段,所以下面再次开窗时,这个字段的时间属性已经丢失了,所以在 union all
两个源开窗合并的结果时,需要重新注册处理时间属性字段,之后使用该字段进行二次开窗统计。但是由于第一次开窗之后的结果对应的处理时间已经超过了对应的窗口结束时间,因此新注册的处理时间已经超过了上一个窗口的结束时间,下面再次开窗统计时,数据将会再下一次窗口内统计,所以最终的结果时间,已经是下一个窗口的时间了,时间明显滞后一个窗口时间。
+
+另一个错误是:我上面使用的是渐进式窗口,因此第一个窗口会在每个小窗口结束时发送最新计算结果,而且不管上游的 kafka
有没有新数据,都会发送结果。如此一次,第二次的开窗,会不断的接收到第一次开窗的结果数据,所以第二次开窗中,除了第一个窗口,后面的窗口计算结果都错了,他们一直在累加。
+
+我只往两个 kafka 源中发送了一次数据,之后再也没发送过数据,但是每次小窗口被触发之后,都会进行累加,具体结果如下:
+
+
+
+Flink UI 界面两次窗口时间处理数据量如下:
+
+
+
+
+
+从两张图中可以看出,在两次窗口触发时间中,第一次开窗对应的两个计算节点的输入数据都是 3,没有变化,但是输出数据量都从 1 变成
2,而且最后那个计算节点,也就是第二次开窗,接收的数据从 2 变成了
4,因此最终的结果输出,第二次的结果就是第一次结果的二倍。**这一点,大家在具体使用中一定要注意,不可以将多个数据源第一次开窗的结果合并之后再次进行开窗**。我上面使用的是渐进式窗口,滚动窗口理论上应该不会出现重复累加的问题,但是最终的结果在窗口时间上应该会滞后一个窗口时间。
+
+# WITH 子句
+
+`with` 子句只能在一个 `select` 语句上面使用,不可单独编写,并且添加英文分号,将其作为一个单独的公共表达式,然后在多个 `select`
语句中使用。如果想实现在多个 `select` 语句中使用同一个公共表达式,可以通过创建临时视图来解决。
+
+示例:
+
+```sql
+create temporary table source(
+ s1 string,
+ s2 string
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'wzq_source',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'properties.group.id' = 'test-1',
+ 'scan.startup.mode' = 'latest-offset',
+ 'scan.topic-partition-discovery.interval' = '1 h',
+ 'format' = 'csv'
+);
+
+create temporary table sink(
+ s1 string
+) with (
+ 'connector' = 'kafka',
+ 'topic' = 'wzq_sink',
+ 'properties.bootstrap.servers' = '${kafka-bootstrapserver}',
+ 'format' = 'csv'
+);
+
+insert into sink
+with with1 as (
+select concat(s1, '|', s2) as w1
+from source
+),
+with2 as (
+select concat(w1, '-', 'w2') as w2
+from with1
+)
+select w2
+from with2
+;
+```
+
+
+
## 写在最后
若大家有优秀的使用案例的话,也可向社区反馈,以丰富更多的案例。
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/flinksql/syntax/7-select.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/flinksql/syntax/7-select.md
index 0aaeb800..c7b859dc 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/flinksql/syntax/7-select.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/flinksql/syntax/7-select.md
@@ -103,7 +103,7 @@ SELECT ... FROM ...;
with_item_name (column_name[, ...n]) AS ( <select_query> )
```
-使用案例:
+使用案例1:
```sql
WITH orders_with_total AS (
@@ -115,8 +115,26 @@ FROM orders_with_total
GROUP BY order_id;
```
+使用案例2:
+
+```sql
+WITH orders_with_total1 AS (
+ SELECT order_id, price + tax AS total
+ FROM Orders
+),
+orders_with_total1 AS (
+ SELECT concateorder_id, price + tax AS total
+ FROM Orders
+)
+SELECT order_id, SUM(total)
+FROM orders_with_total
+GROUP BY order_id;
+```
+
上面的`with`子句定义了`orders_with_total`,并且在`group by`子句中使用了它。
+> `with` 子句只能在一个 `select` 语句上面使用,不可单独编写,并且添加英文分号,将其作为一个单独的公共表达式,然后在多个 `select`
语句中使用。如果想实现在多个 `select` 语句中使用同一个公共表达式,可以通过创建临时视图来解决。
+
## SELECT和WHERE
在流批模式任务中均可使用。
diff --git a/static/doc/image/flinksql/example/jdbc-lookup-join-running.png
b/static/doc/image/flinksql/example/jdbc-lookup-join-running.png
new file mode 100644
index 00000000..4c785ad5
Binary files /dev/null and
b/static/doc/image/flinksql/example/jdbc-lookup-join-running.png differ
diff --git a/static/doc/image/flinksql/example/jdbc-lookup-join.png
b/static/doc/image/flinksql/example/jdbc-lookup-join.png
new file mode 100644
index 00000000..a72b8aa9
Binary files /dev/null and
b/static/doc/image/flinksql/example/jdbc-lookup-join.png differ
diff --git a/static/doc/image/flinksql/example/jdbc-regular-join.png
b/static/doc/image/flinksql/example/jdbc-regular-join.png
new file mode 100644
index 00000000..94ca2945
Binary files /dev/null and
b/static/doc/image/flinksql/example/jdbc-regular-join.png differ
diff --git
a/static/doc/image/flinksql/example/multy-stream-union-window-result.png
b/static/doc/image/flinksql/example/multy-stream-union-window-result.png
new file mode 100644
index 00000000..2d7d8625
Binary files /dev/null and
b/static/doc/image/flinksql/example/multy-stream-union-window-result.png differ
diff --git a/static/doc/image/flinksql/example/multy-stream-union-window.png
b/static/doc/image/flinksql/example/multy-stream-union-window.png
new file mode 100644
index 00000000..b5044276
Binary files /dev/null and
b/static/doc/image/flinksql/example/multy-stream-union-window.png differ
diff --git
a/static/doc/image/flinksql/example/multy-stream-window-union-result.png
b/static/doc/image/flinksql/example/multy-stream-window-union-result.png
new file mode 100644
index 00000000..d9c6b3d6
Binary files /dev/null and
b/static/doc/image/flinksql/example/multy-stream-window-union-result.png differ
diff --git
a/static/doc/image/flinksql/example/multy-stream-window-union-ui1.png
b/static/doc/image/flinksql/example/multy-stream-window-union-ui1.png
new file mode 100644
index 00000000..140715c4
Binary files /dev/null and
b/static/doc/image/flinksql/example/multy-stream-window-union-ui1.png differ
diff --git
a/static/doc/image/flinksql/example/multy-stream-window-union-ui2.png
b/static/doc/image/flinksql/example/multy-stream-window-union-ui2.png
new file mode 100644
index 00000000..0d2189c8
Binary files /dev/null and
b/static/doc/image/flinksql/example/multy-stream-window-union-ui2.png differ
diff --git a/static/doc/image/flinksql/example/one-stream-window.png
b/static/doc/image/flinksql/example/one-stream-window.png
new file mode 100644
index 00000000..282faedd
Binary files /dev/null and
b/static/doc/image/flinksql/example/one-stream-window.png differ
diff --git a/static/doc/image/flinksql/example/one-stream-window2.png
b/static/doc/image/flinksql/example/one-stream-window2.png
new file mode 100644
index 00000000..0b962864
Binary files /dev/null and
b/static/doc/image/flinksql/example/one-stream-window2.png differ