lincoln lee created FLINK-32501:
-----------------------------------

             Summary: Wrong execution plan of a proctime window aggregation 
generated due to incorrect cost evaluation
                 Key: FLINK-32501
                 URL: https://issues.apache.org/jira/browse/FLINK-32501
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.17.1, 1.16.2
            Reporter: lincoln lee
            Assignee: lincoln lee
             Fix For: 1.18.0, 1.17.2


Currently when uses window aggregation referring a windowing tvf with a filter 
condition, may encounter wrong plan which may hang forever in runtime(the 
window aggregate operator never output)

for such a case:
{code}
insert into sink
    select
        window_start,
        window_end,
        b,
        COALESCE(sum(case
            when a = 11
            then 1
        end), 0) c
    from
        TABLE(
            TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS)
        )
    where
        a in (1, 5, 7, 9, 11)
    GROUP BY
        window_start, window_end, b
{code}

generate wrong plan which didn't combine the proctime WindowTableFunction into 
WindowAggregate (so when translate to execution plan the WindowAggregate will 
wrongly recognize the window as an event-time window, then the 
WindowAggregateOperator will not receive watermark nor setup timers to fire any 
windows in runtime)
{code}
Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
+- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
   +- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS 
window_start, end('w$) AS window_end])
      +- Exchange(distribution=[hash[b]])
         +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1, 
null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
            +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[10 
s])])
               +- Calc(select=[a, b, PROCTIME() AS proctime])
                  +- TableSourceScan(table=[[default_catalog, default_database, 
source, project=[a, b], metadata=[]]], fields=[a, b])
{code}

expected plan:
{code}
Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
+- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
   +- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], size=[10 
s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, end('w$) AS 
window_end])
      +- Exchange(distribution=[hash[b]])
         +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() 
AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
            +- TableSourceScan(table=[[default_catalog, default_database, 
source, project=[a, b], metadata=[]]], fields=[a, b])
{code}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to