[
https://issues.apache.org/jira/browse/FLINK-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15727375#comment-15727375
]
Kurt Young commented on FLINK-5266:
-----------------------------------
Hi [~fhueske], i have checked your PR #2926, and it doesn't solve the case.
I used this query:
{code}
groupBy('name).select('acctbal.sum, 'name)
{code}
Ideally, we only need two fields from the source, and can immediately do
project right after the scan node. Furthermore, we can push the project into
the scan if the source supports it. Currently the AST and optimized plan looks
like this:
{code}
== Abstract Syntax Tree ==
LogicalProject(TMP_1=[$1], name=[$0])
LogicalAggregate(group=[{1}], TMP_0=[SUM($5)])
LogicalTableScan(table=[[supplier]])
== Optimized Logical Plan ==
DataSetCalc(select=[TMP_0 AS TMP_1, name])
DataSetAggregate(groupBy=[name], select=[name, SUM(acctbal) AS TMP_0])
BatchTableSourceScan(table=[[supplier]])
{code}
Seems we need a optimize rule like "push project past aggregator" or "extract
project from aggregate" if there is no projection. I think either of these is
non-trivial to implement. But when i use sql query, it seems Calcite's parser
has already extract the projection node in the first place. The query and plan
looks like this:
{code}
select sum(acctbal), name from supplier group by name
== Abstract Syntax Tree ==
LogicalProject(EXPR$0=[$1], name=[$0])
LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)])
LogicalProject(name=[$1], acctbal=[$5])
LogicalTableScan(table=[[supplier]])
== Optimized Logical Plan ==
DataSetCalc(select=[EXPR$0, name])
DataSetAggregate(groupBy=[name], select=[name, SUM(acctbal) AS EXPR$0])
DataSetCalc(select=[name, acctbal])
BatchTableSourceScan(table=[[supplier]])
{code}
I think we could also follow this approach by eagerly generating a projection
operator when we selecting aggregation fields from table or GroupedTable.
I actually have almost finish the code, will open a PR soon.
> Eagerly project unused fields when selecting aggregation fields
> ---------------------------------------------------------------
>
> Key: FLINK-5266
> URL: https://issues.apache.org/jira/browse/FLINK-5266
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Kurt Young
> Assignee: Kurt Young
>
> When we call table's {{select}} method and if it contains some aggregations,
> we will project fields after the aggregation. Would be better to project
> unused fields before the aggregation, and can furthermore leave the
> opportunity to push the project into scan.
> For example, the current logical plan of a simple query:
> {code}
> table.select('a.sum as 's, 'a.max)
> {code}
> is
> {code}
> LogicalProject(s=[$0], TMP_2=[$1])
> LogicalAggregate(group=[{}], TMP_0=[SUM($5)], TMP_1=[MAX($5)])
> LogicalTableScan(table=[[supplier]])
> {code}
> Would be better if we can project unused fields right after scan, and looks
> like this:
> {code}
> LogicalProject(s=[$0], EXPR$1=[$0])
> LogicalAggregate(group=[{}], EXPR$1=[SUM($0)])
> LogicalProject(a=[$5])
> LogicalTableScan(table=[[supplier]])
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)