lincoln lee created FLINK-32578:
-----------------------------------

             Summary: Cascaded group by window time columns on a proctime 
window aggregate may result hang for ever
                 Key: FLINK-32578
                 URL: https://issues.apache.org/jira/browse/FLINK-32578
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.17.1
            Reporter: lincoln lee
            Assignee: lincoln lee
             Fix For: 1.18.0, 1.17.2


Currently when group by window time columns on a proctime window aggregate 
result will get a wrong plan which may result hang for ever in runtime.

For such a query:
{code}
insert into s1
SELECT
  window_start,
  window_end,
  sum(cnt),
  count(*)
FROM (
 SELECT
    a,
    b,
    window_start,
    window_end,
    count(*) as cnt,
    sum(d) as sum_d,
    max(d) as max_d
 FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
 GROUP BY a, window_start, window_end, b
)
GROUP BY a, window_start, window_end
{code}
the inner proctime window works fine, but the outer one doesn't work due to a 
wrong plan which will translate to a unexpected event mode window operator:
{code}
Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c])
+- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS c])
   +- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) 
AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
      +- Exchange(distribution=[hash[a]])
         +- Calc(select=[a, window_start, window_end, cnt])
            +- WindowAggregate(groupBy=[a, b], 
window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS 
cnt, start('w$) AS window_start, end('w$) AS window_end])
               +- Exchange(distribution=[hash[a, b]])
                  +- Calc(select=[a, b, d, PROCTIME() AS proctime])
                     +- TableSourceScan(table=[[default_catalog, 
default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d])
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to