[ https://issues.apache.org/jira/browse/FLINK-32501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee updated FLINK-32501: -------------------------------- Fix Version/s: (was: 1.17.2) > Wrong execution plan of a proctime window aggregation generated due to > incorrect cost evaluation > ------------------------------------------------------------------------------------------------ > > Key: FLINK-32501 > URL: https://issues.apache.org/jira/browse/FLINK-32501 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.16.2, 1.17.1 > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Currently when uses window aggregation referring a windowing tvf with a > filter condition, may encounter wrong plan which may hang forever in > runtime(the window aggregate operator never output) > for such a case: > {code} > insert into sink > select > window_start, > window_end, > b, > COALESCE(sum(case > when a = 11 > then 1 > end), 0) c > from > TABLE( > TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS) > ) > where > a in (1, 5, 7, 9, 11) > GROUP BY > window_start, window_end, b > {code} > generate wrong plan which didn't combine the proctime WindowTableFunction > into WindowAggregate (so when translate to execution plan the WindowAggregate > will wrongly recognize the window as an event-time window, then the > WindowAggregateOperator will not receive watermark nor setup timers to fire > any windows in runtime) > {code} > Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) > +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS > TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c]) > +- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) > AS window_start, end('w$) AS window_end]) > +- Exchange(distribution=[hash[b]]) > +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1, > null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) > +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], > size=[10 s])]) > +- Calc(select=[a, b, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, > default_database, source, project=[a, b], metadata=[]]], fields=[a, b]) > {code} > expected plan: > {code} > Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) > +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS > TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c]) > +- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], > size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, > end('w$) AS window_end]) > +- Exchange(distribution=[hash[b]]) > +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, > PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) > +- TableSourceScan(table=[[default_catalog, default_database, > source, project=[a, b], metadata=[]]], fields=[a, b]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)