[
https://issues.apache.org/jira/browse/FLINK-35816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lincoln lee updated FLINK-35816:
--------------------------------
Fix Version/s: 1.20.0
> Non-mergeable proctime tvf window aggregate needs to fallback to group
> aggregate
> --------------------------------------------------------------------------------
>
> Key: FLINK-35816
> URL: https://issues.apache.org/jira/browse/FLINK-35816
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0, 1.19.1
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Non-mergeable proctime tvf window aggregate needs to fallback to group
> aggregate, e.g.,
> an example:
> {code:java}
> select c, count(a)
> from
> TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
> interval '5' minutes))
> where window_start <> '123'
> group by window_start, window_end, c, window_time
> {code}
> the window property in above query was materialized before aggregation, so it
> lost processing time attribute and cause the planner failed to pull up
> `StreamPhysicalWindowTableFunction` into the `StreamPhysicalWindowAggregate`
> to generate a valid execution plan, like following(which goes into the
> attached window strategy which relies on the upstream watermark but lacks of
> a watermark assigner):
> {code:java}
> Calc(select=[c, EXPR$1])
> +- WindowAggregate(groupBy=[c], window=[CUMULATE(win_start=[window_start],
> win_end=[window_end], max_size=[5 min], step=[10 s])], select=[c, COUNT(a) AS
> EXPR$1, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS
> window_time])
> +- Exchange(distribution=[hash[c]])
> +- Calc(select=[window_start, window_end, c, window_time, a],
> where=[<>(window_start, '123')])
> +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
> max_size=[5 min], step=[10 s])])
> +- Calc(select=[a, c, proctime])
> +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
> 1000:INTERVAL SECOND)])
> +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
> +- TableSourceScan(table=[[default_catalog, default_database,
> MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
> {code}
> so, semantically when the window time attribute was materialized after window
> table function, the downstream aggregation should use group aggregation, the
> expected plan of the above example can be:
> {code:java}
> Calc(select=[c, EXPR$1])
> +- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
> select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
> +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
> +- Calc(select=[window_start, window_end, c, window_time, a],
> where=[<>(window_start, '123')])
> +- WindowTableFunction(window=[CUMULATE(time_col=[proctime],
> max_size=[5 min], step=[10 s])])
> +- Calc(select=[a, c, proctime])
> +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
> 1000:INTERVAL SECOND)])
> +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
> +- TableSourceScan(table=[[default_catalog, default_database,
> MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)