[
https://issues.apache.org/jira/browse/FLINK-21290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu reassigned FLINK-21290:
-------------------------------
Assignee: Jing Zhang
> Support Projection push down for Window TVF
> -------------------------------------------
>
> Key: FLINK-21290
> URL: https://issues.apache.org/jira/browse/FLINK-21290
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: Jark Wu
> Assignee: Jing Zhang
> Priority: Major
>
> {code:scala}
> @Test
> def testTumble_ProjectionPushDown(): Unit = {
> // TODO: [b, c, e, proctime] are never used, should be pruned
> val sql =
> """
> |SELECT
> | a,
> | window_start,
> | window_end,
> | count(*),
> | sum(d)
> |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> |GROUP BY a, window_start, window_end
> """.stripMargin
> util.verifyRelPlan(sql)
> }
> {code}
> For the above test, currently we get the following plan:
> {code}
> Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
> +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15
> min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS
> window_start, end('w$) AS window_end])
> +- Exchange(distribution=[hash[a]])
> +- Calc(select=[a, d, rowtime])
> +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
> 1000:INTERVAL SECOND)])
> +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
> +- TableSourceScan(table=[[default_catalog, default_database,
> MyTable]], fields=[a, b, c, d, e, rowtime])
> {code}
> It should be able to prune fields and get the following plan:
> {code}
> Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
> +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15
> min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS
> window_start, end('w$) AS window_end])
> +- Exchange(distribution=[hash[a]])
> +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
> 1000:INTERVAL SECOND)])
> +- TableSourceScan(table=[[default_catalog, default_database,
> MyTable]], fields=[a, d, rowtime])
> {code}
> The reason is we didn't transpose Project and WindowTableFunction in logical
> phase.
> {code}
> LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)])
> +- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3])
> +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5),
> 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
> VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIME ATTRIBUTE(ROWTIME)
> rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start,
> TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
> +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
> proctime=[$6])
> +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
> 1000:INTERVAL SECOND)])
> +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
> rowtime=[$5], proctime=[PROCTIME()])
> +- LogicalTableScan(table=[[default_catalog, default_database,
> MyTable]])
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)