[ 
https://issues.apache.org/jira/browse/FLINK-21290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-21290:
-------------------------------

    Assignee: Jing Zhang

> Support Projection push down for Window TVF
> -------------------------------------------
>
>                 Key: FLINK-21290
>                 URL: https://issues.apache.org/jira/browse/FLINK-21290
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Assignee: Jing Zhang
>            Priority: Major
>
> {code:scala}
>   @Test
>   def testTumble_ProjectionPushDown(): Unit = {
>     // TODO: [b, c, e, proctime] are never used, should be pruned
>     val sql =
>       """
>         |SELECT
>         |   a,
>         |   window_start,
>         |   window_end,
>         |   count(*),
>         |   sum(d)
>         |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>         |GROUP BY a, window_start, window_end
>       """.stripMargin
>     util.verifyRelPlan(sql)
>   }
> {code}
> For the above test, currently we get the following plan:
> {code}
> Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
> +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 
> min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS 
> window_start, end('w$) AS window_end])
>    +- Exchange(distribution=[hash[a]])
>       +- Calc(select=[a, d, rowtime])
>          +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
> 1000:INTERVAL SECOND)])
>             +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
>                +- TableSourceScan(table=[[default_catalog, default_database, 
> MyTable]], fields=[a, b, c, d, e, rowtime])
> {code}
> It should be able to prune fields and get the following plan:
> {code}
> Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
> +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 
> min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS 
> window_start, end('w$) AS window_end])
>    +- Exchange(distribution=[hash[a]])
>          +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
> 1000:INTERVAL SECOND)])
>                +- TableSourceScan(table=[[default_catalog, default_database, 
> MyTable]], fields=[a, d, rowtime])
> {code}
> The reason is we didn't transpose Project and WindowTableFunction in logical 
> phase. 
> {code}
> LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)])
> +- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3])
>    +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 
> 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
> VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIME ATTRIBUTE(ROWTIME) 
> rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, 
> TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
>       +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
> proctime=[$6])
>          +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
> 1000:INTERVAL SECOND)])
>             +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
> rowtime=[$5], proctime=[PROCTIME()])
>                +- LogicalTableScan(table=[[default_catalog, default_database, 
> MyTable]])
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to