[ 
https://issues.apache.org/jira/browse/FLINK-39804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18087881#comment-18087881
 ] 

Qiu Yanjun commented on FLINK-39804:
------------------------------------

Hi, I opened a PR for this issue:
https://github.com/apache/flink/pull/28370

The fix follows the suggested PROJECT_REWRITE-specific approach by using 
PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS only in the project rewrite phase, 
and adds batch/stream plan regression tests.

Could a committer help confirm the approach and assign the ticket if it looks 
reasonable?

> Nested projection fields not pushed down to source
> --------------------------------------------------
>
>                 Key: FLINK-39804
>                 URL: https://issues.apache.org/jira/browse/FLINK-39804
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Daniel Rossos
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Summary
> If you have a source that supports nested-projection-pushdown, if there is a 
> non-pushed-down filter between the projection and source scan, only the 
> top-level columns of your projection will be pushed down into the source.
> Highlevel e.g 
> {{SQL Query Example:}}
> {{SELECT a.aOne, b FROM t1 WHERE <unpushable Filter> }}
> Results in following Optimized Execution Plan:
> {{== Optimized Execution Plan ==}}
> {{Calc(select=[a.aOne, b], where=[<filter>])}}
> {{+- TableSourceScan(table=[[c, db, t1, filter=[], project=[a, b]]], 
> fields=[a, b])}}
> h1. Detailed Trace
> As part of the 
> [PROJECT_RULES|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala#L212]
>  in our query optimizer, we utilize the Calcite rule 
> CoreRules.PROJECT_FILTER_TRANSPOSE. To 
> [summarize|https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/rel/rules/ProjectFilterTransposeRule.java#L92],
>  this rule will check to see if we have a PROJECTION above a FILTER node in 
> our graph, if we do it will attempt to 'transpose' the projection node making 
> it so the structure is that the Projection node now occurs "below" the 
> filtering. This then sets up the structure to allow us for this projection to 
> be pushed down into our source-scan. The problem is that in the default 
> PROJECT_FILTER_TRANSPOSE mode of this transposition rule it will utilize a 
> [PushProjector|https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/rel/rules/ProjectFilterTransposeRule.java#L176]
>  to traverse the Projection that down the chain eventually uses 
> visitInputRef(), which will only record the top-level index of the column in 
> the projection. 
> h1. Potential Solution / Investigation
> In our testing, we found that by changing the projection rule to 
> PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS or 
> PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS in the 
> [PROJECT_REWRITE|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala#L253]
>  rule-set* we were able to avoid this issue because these variants preserve 
> whole project/filter expressions instead.
>  
> *Note, I specify PROJECT_REWRITE specifically, because if we use the 
> PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS transpose rule in the Volcano 
> LOGICAL phase we get an infinite oscillation problem because 
> PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS pushes Project below Filter, 
> FILTER_PROJECT_TRANSPOSE can pull Filter back below/above that Project, 
>  and PROJECT_MERGE collapses the resulting projects back into the original 
> shape, making the same rewrite applicable again. In our testing we left the 
> transpose rule in LOGICAL phase as the current default, but still analyzing 
> implications here.
> Still investigating further, but interested to hear the opinion / perspective 
> from someone with more understanding on the Calcite planner / Flink table 
> optimizations work. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to