[ 
https://issues.apache.org/jira/browse/FLINK-32501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-32501:
--------------------------------
    Fix Version/s:     (was: 1.17.2)

> Wrong execution plan of a proctime window aggregation generated due to 
> incorrect cost evaluation
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32501
>                 URL: https://issues.apache.org/jira/browse/FLINK-32501
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.2, 1.17.1
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.18.0
>
>
> Currently when uses window aggregation referring a windowing tvf with a 
> filter condition, may encounter wrong plan which may hang forever in 
> runtime(the window aggregate operator never output)
> for such a case:
> {code}
> insert into sink
>     select
>         window_start,
>         window_end,
>         b,
>         COALESCE(sum(case
>             when a = 11
>             then 1
>         end), 0) c
>     from
>         TABLE(
>             TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS)
>         )
>     where
>         a in (1, 5, 7, 9, 11)
>     GROUP BY
>         window_start, window_end, b
> {code}
> generate wrong plan which didn't combine the proctime WindowTableFunction 
> into WindowAggregate (so when translate to execution plan the WindowAggregate 
> will wrongly recognize the window as an event-time window, then the 
> WindowAggregateOperator will not receive watermark nor setup timers to fire 
> any windows in runtime)
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
>    +- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) 
> AS window_start, end('w$) AS window_end])
>       +- Exchange(distribution=[hash[b]])
>          +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1, 
> null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
>             +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], 
> size=[10 s])])
>                +- Calc(select=[a, b, PROCTIME() AS proctime])
>                   +- TableSourceScan(table=[[default_catalog, 
> default_database, source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}
> expected plan:
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
>    +- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], 
> size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, 
> end('w$) AS window_end])
>       +- Exchange(distribution=[hash[b]])
>          +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, 
> PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to