[
https://issues.apache.org/jira/browse/FLINK-34238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xuyang updated FLINK-34238:
---------------------------
Description:
Take the following plan as an example:
{code:java}
Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
+- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS
EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS
wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS
window_end])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS
$f4, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
+- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5
min], partition keys=[a])])
+- Exchange(distribution=[hash[a]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime]) {code}
If the node `WindowTableFunction`, `Calc` and `WindowAggregate` can be chained
finally, theĀ `Exchange` between `Calc` and `WindowAggregate` can be removed.
> In streaming mode, redundant exchange nodes can be optimally deleted in some
> cases
> ----------------------------------------------------------------------------------
>
> Key: FLINK-34238
> URL: https://issues.apache.org/jira/browse/FLINK-34238
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: xuyang
> Priority: Minor
>
> Take the following plan as an example:
> {code:java}
> Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
> +- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start],
> win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*)
> AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e)
> AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS
> window_end])
> +- Exchange(distribution=[hash[a]])
> +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS
> $f4, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
> +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5
> min], partition keys=[a])])
> +- Exchange(distribution=[hash[a]])
> +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
> 1000:INTERVAL SECOND)])
> +- TableSourceScan(table=[[default_catalog,
> default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) {code}
> If the node `WindowTableFunction`, `Calc` and `WindowAggregate` can be
> chained finally, theĀ `Exchange` between `Calc` and `WindowAggregate` can be
> removed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)