[
https://issues.apache.org/jira/browse/FLINK-32501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lincoln lee resolved FLINK-32501.
---------------------------------
Resolution: Fixed
fixed in master: 1fc3b3746b60ad1636f77fd102444ebaa03bdc3f
> Wrong execution plan of a proctime window aggregation generated due to
> incorrect cost evaluation
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-32501
> URL: https://issues.apache.org/jira/browse/FLINK-32501
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.16.2, 1.17.1
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2
>
>
> Currently when uses window aggregation referring a windowing tvf with a
> filter condition, may encounter wrong plan which may hang forever in
> runtime(the window aggregate operator never output)
> for such a case:
> {code}
> insert into sink
> select
> window_start,
> window_end,
> b,
> COALESCE(sum(case
> when a = 11
> then 1
> end), 0) c
> from
> TABLE(
> TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS)
> )
> where
> a in (1, 5, 7, 9, 11)
> GROUP BY
> window_start, window_end, b
> {code}
> generate wrong plan which didn't combine the proctime WindowTableFunction
> into WindowAggregate (so when translate to execution plan the WindowAggregate
> will wrongly recognize the window as an event-time window, then the
> WindowAggregateOperator will not receive watermark nor setup timers to fire
> any windows in runtime)
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
> +- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start],
> win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$)
> AS window_start, end('w$) AS window_end])
> +- Exchange(distribution=[hash[b]])
> +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1,
> null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
> +- WindowTableFunction(window=[TUMBLE(time_col=[proctime],
> size=[10 s])])
> +- Calc(select=[a, b, PROCTIME() AS proctime])
> +- TableSourceScan(table=[[default_catalog,
> default_database, source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}
> expected plan:
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
> +- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime],
> size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start,
> end('w$) AS window_end])
> +- Exchange(distribution=[hash[b]])
> +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3,
> PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
> +- TableSourceScan(table=[[default_catalog, default_database,
> source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)