[
https://issues.apache.org/jira/browse/FLINK-34238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811180#comment-17811180
]
Benchao Li commented on FLINK-34238:
------------------------------------
Make sense to me. Another general way is to let {{FlinkExpandConversionRule}}
to remove shuffle via distribution trait, which is now used only in batch. (I
know it needs a lot of efforts, since many rules in streaming need to adapt,
we've done this work internally, hopefully someone could contribute this back)
> In streaming mode, redundant exchange nodes can be optimally deleted in some
> cases
> ----------------------------------------------------------------------------------
>
> Key: FLINK-34238
> URL: https://issues.apache.org/jira/browse/FLINK-34238
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: xuyang
> Priority: Minor
>
> Take the following plan as an example:
> {code:java}
> Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
> +- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start],
> win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*)
> AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e)
> AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS
> window_end])
> +- Exchange(distribution=[hash[a]])
> +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS
> $f4, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
> +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5
> min], partition keys=[a])])
> +- Exchange(distribution=[hash[a]])
> +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
> 1000:INTERVAL SECOND)])
> +- TableSourceScan(table=[[default_catalog,
> default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) {code}
> If the node `WindowTableFunction`, `Calc` and `WindowAggregate` can be
> chained finally, theĀ `Exchange` between `Calc` and `WindowAggregate` can be
> removed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)