[ 
https://issues.apache.org/jira/browse/FLINK-35816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35816:
--------------------------------
    Description: 
Non-mergeable proctime tvf window aggregate needs to fallback to group 
aggregate, e.g.,

an example:
{code:java}
select c, count(a)
from
   TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
where window_start <> '123'
group by window_start, window_end, c, window_time
{code}
the window property in above query was materialized before aggregation, so it 
lost processing time attribute and cause the planner failed to pull up 
`StreamPhysicalWindowTableFunction` into the `StreamPhysicalWindowAggregate` to 
generate a valid execution plan, like following(which goes into the attached 
window strategy which relies on the upstream watermark but lacks of a watermark 
assigner):
{code:java}
Calc(select=[c, EXPR$1])
+- WindowAggregate(groupBy=[c], window=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[5 min], step=[10 s])], select=[c, COUNT(a) AS 
EXPR$1, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
  +- Exchange(distribution=[hash[c]])
    +- Calc(select=[window_start, window_end, c, window_time, a], 
where=[<>(window_start, '123')])
      +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 
min], step=[10 s])])
        +- Calc(select=[a, c, proctime])
          +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
            +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
{code}
so, semantically when the window time attribute was materialized after window 
table function, the downstream aggregation should use group aggregation, the 
expected plan of the above example can be:
{code:java}
Calc(select=[c, EXPR$1])
+- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
  +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
    +- Calc(select=[window_start, window_end, c, window_time, a], 
where=[<>(window_start, '123')]) 
      +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 
min], step=[10 s])])
        +- Calc(select=[a, c, proctime])
          +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
            +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
{code}

  was:
Non-mergeable proctime tvf window aggregate needs to fallback to group 
aggregate, e.g.,

an example:

{code}
select c, count(a)
from
   TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
interval '5' minutes))
where window_start <> '123'
group by window_start, window_end, c, window_time
{code}

the window property in above query was materialized before aggregation, so it 
lost processing time attribute and cause the planner failed to pull up 
`StreamPhysicalWindowTableFunction` into the `StreamPhysicalWindowAggregate` to 
generate a valid execution plan, like following(which goes into the attached 
window strategy which relies on the upstream watermark but lacks of a watermark 
assigner):

{code}
Calc(select=[c, EXPR$1])
+- WindowAggregate(groupBy=[c], window=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[5 min], step=[10 s])], select=[c, COUNT(a) AS 
EXPR$1, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[window_start, window_end, c, window_time, a], 
where=[<>(window_start, '123')])
+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], 
step=[10 s])])
+- Calc(select=[a, c, proctime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
{code}

so, semantically when the window time attribute was materialized after window 
table function, the downstream aggregation should use group aggregation, the 
expected plan of the above example can be:

{code}
Calc(select=[c, EXPR$1])
+- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
+- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
+- Calc(select=[window_start, window_end, c, window_time, a], 
where=[<>(window_start, '123')])
+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], 
step=[10 s])])
+- Calc(select=[a, c, proctime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
{code}


> Non-mergeable proctime tvf window aggregate needs to fallback to group 
> aggregate
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-35816
>                 URL: https://issues.apache.org/jira/browse/FLINK-35816
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0, 1.19.1
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>
> Non-mergeable proctime tvf window aggregate needs to fallback to group 
> aggregate, e.g.,
> an example:
> {code:java}
> select c, count(a)
> from
>    TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, 
> interval '5' minutes))
> where window_start <> '123'
> group by window_start, window_end, c, window_time
> {code}
> the window property in above query was materialized before aggregation, so it 
> lost processing time attribute and cause the planner failed to pull up 
> `StreamPhysicalWindowTableFunction` into the `StreamPhysicalWindowAggregate` 
> to generate a valid execution plan, like following(which goes into the 
> attached window strategy which relies on the upstream watermark but lacks of 
> a watermark assigner):
> {code:java}
> Calc(select=[c, EXPR$1])
> +- WindowAggregate(groupBy=[c], window=[CUMULATE(win_start=[window_start], 
> win_end=[window_end], max_size=[5 min], step=[10 s])], select=[c, COUNT(a) AS 
> EXPR$1, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
> window_time])
>   +- Exchange(distribution=[hash[c]])
>     +- Calc(select=[window_start, window_end, c, window_time, a], 
> where=[<>(window_start, '123')])
>       +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
> max_size=[5 min], step=[10 s])])
>         +- Calc(select=[a, c, proctime])
>           +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
> 1000:INTERVAL SECOND)])
>             +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
>               +- TableSourceScan(table=[[default_catalog, default_database, 
> MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
> {code}
> so, semantically when the window time attribute was materialized after window 
> table function, the downstream aggregation should use group aggregation, the 
> expected plan of the above example can be:
> {code:java}
> Calc(select=[c, EXPR$1])
> +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
> select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1])
>   +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
>     +- Calc(select=[window_start, window_end, c, window_time, a], 
> where=[<>(window_start, '123')]) 
>       +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
> max_size=[5 min], step=[10 s])])
>         +- Calc(select=[a, c, proctime])
>           +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
> 1000:INTERVAL SECOND)])
>             +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
>               +- TableSourceScan(table=[[default_catalog, default_database, 
> MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to