lincoln lee created FLINK-35816:
-----------------------------------
Summary: Non-mergeable proctime tvf window aggregate needs to
fallback to group aggregate
Key: FLINK-35816
URL: https://issues.apache.org/jira/browse/FLINK-35816
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.19.1, 1.20.0
Reporter: lincoln lee
Assignee: lincoln lee
Fix For: 1.20.0
Non-mergeable proctime tvf window aggregate needs to fallback to group
aggregate, e.g.,
an example:
{code}
select c, count(a)
from
TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds,
interval '5' minutes))
where window_start <> '123'
group by window_start, window_end, c, window_time
{code}
the window property in above query was materialized before aggregation, so it
lost processing time attribute and cause the planner failed to pull up
`StreamPhysicalWindowTableFunction` into the `StreamPhysicalWindowAggregate` to
generate a valid execution plan, like following(which goes into the attached
window strategy which relies on the upstream watermark but lacks of a watermark
assigner):
{code}
Calc(select=[c, EXPR$1])
+- WindowAggregate(groupBy=[c], window=[CUMULATE(win_start=[window_start],
win_end=[window_end], max_size=[5 min], step=[10 s])], select=[c, COUNT(a) AS
EXPR$1, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS
window_time])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[window_start, window_end, c, window_time, a],
where=[<>(window_start, '123')])
+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min],
step=[10 s])])
+- Calc(select=[a, c, proctime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable,
project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
{code}
so, semantically when the window time attribute was materialized after window
table function, the downstream aggregation should use group aggregation, the
expected plan of the above example can be:
{code}
Calc(select=[c, EXPR$1])
+- GroupAggregate(groupBy=[window_start, window_end, c, window_time],
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+- Calc(select=[window_start, window_end, c, window_time, a],
where=[<>(window_start, '123')])
+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min],
step=[10 s])])
+- Calc(select=[a, c, proctime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable,
project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)