[ 
https://issues.apache.org/jira/browse/FLINK-38832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18047343#comment-18047343
 ] 

Xuyang Zhong commented on FLINK-38832:
--------------------------------------

This seems related to https://issues.apache.org/jira/browse/FLINK-35854. cc 
[~Sergey Nuyanzin]  

Before calcite 1.35, `RelBuilder#aggregate(GroupKey groupKey, AggCall... 
aggCalls)` and `RelBuilder#aggregate(GroupKey groupKey, List<AggregateCall> 
aggregateCalls)` will call `RelBuilder#aggregate(GroupKey groupKey, 
Iterable<AggCall> aggCalls)` to build an aggregate, and Flink overrides this 
function in `FlinkRelBuilder` 

!image-2025-12-24-10-51-40-936.png|width=408,height=262!

However, in Calcite 1.35, `RelBuilder#aggregate(GroupKey groupKey, 
List<AggregateCall> aggregateCalls)` will call the private function 
`aggregate_` directly, and Flink has no chance to override it and rewrite the 
plan.

>From my perspective, I think the original plan is better.

 

> Projection pushdown of  count(*) in planner changes
> ---------------------------------------------------
>
>                 Key: FLINK-38832
>                 URL: https://issues.apache.org/jira/browse/FLINK-38832
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.2.0
>            Reporter: Hongshun Wang
>            Priority: Major
>             Fix For: 2.2.1
>
>         Attachments: image-2025-12-24-10-50-49-543.png, 
> image-2025-12-24-10-51-11-151.png, image-2025-12-24-10-51-40-936.png
>
>
> Since 2.2.0, explain of this statement changes: 
> {code:java}
> select count(*) from log_table;{code}
>  
> In 2.1, only first field of source is needed, thus will be pushed down to 
> source.
> {code:java}
> FlinkLogicalCalc(select=[EXPR$0])
> +- FlinkLogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
>    +- FlinkLogicalTableSourceScan(table=[[testcatalog, defaultdb, log_table, 
> project=[id]]], fields=[id])
> HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
> +- Exchange(distribution=[single])
>    +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
>       +- TableSourceScan(table=[[testcatalog, defaultdb, log_table, 
> project=[id]]], fields=[id]) {code}
>  
> in 2.2, id is not pushed down to source.
> {code:java}
> LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
> +- LogicalProject(exprs=[[0]])
>    +- LogicalTableScan(table=[[testcatalog, defaultdb, test_log_table]])
> HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
> +- Exchange(distribution=[single])
>    +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
>       +- TableSourceScan(table=[[testcatalog, defaultdb, log_table]], 
> fields=[id, address, name]) {code}
> FlinkLogicalAggregate is changed to LogicalProject.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to