[
https://issues.apache.org/jira/browse/FLINK-39804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39804:
-----------------------------------
Labels: pull-request-available (was: )
> Nested projection fields not pushed down to source
> --------------------------------------------------
>
> Key: FLINK-39804
> URL: https://issues.apache.org/jira/browse/FLINK-39804
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Daniel Rossos
> Priority: Major
> Labels: pull-request-available
>
> h1. Summary
> If you have a source that supports nested-projection-pushdown, if there is a
> non-pushed-down filter between the projection and source scan, only the
> top-level columns of your projection will be pushed down into the source.
> Highlevel e.g
> {{SQL Query Example:}}
> {{SELECT a.aOne, b FROM t1 WHERE <unpushable Filter> }}
> Results in following Optimized Execution Plan:
> {{== Optimized Execution Plan ==}}
> {{Calc(select=[a.aOne, b], where=[<filter>])}}
> {{+- TableSourceScan(table=[[c, db, t1, filter=[], project=[a, b]]],
> fields=[a, b])}}
> h1. Detailed Trace
> As part of the
> [PROJECT_RULES|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala#L212]
> in our query optimizer, we utilize the Calcite rule
> CoreRules.PROJECT_FILTER_TRANSPOSE. To
> [summarize|https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/rel/rules/ProjectFilterTransposeRule.java#L92],
> this rule will check to see if we have a PROJECTION above a FILTER node in
> our graph, if we do it will attempt to 'transpose' the projection node making
> it so the structure is that the Projection node now occurs "below" the
> filtering. This then sets up the structure to allow us for this projection to
> be pushed down into our source-scan. The problem is that in the default
> PROJECT_FILTER_TRANSPOSE mode of this transposition rule it will utilize a
> [PushProjector|https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/rel/rules/ProjectFilterTransposeRule.java#L176]
> to traverse the Projection that down the chain eventually uses
> visitInputRef(), which will only record the top-level index of the column in
> the projection.
> h1. Potential Solution / Investigation
> In our testing, we found that by changing the projection rule to
> PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS or
> PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS in the
> [PROJECT_REWRITE|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala#L253]
> rule-set* we were able to avoid this issue because these variants preserve
> whole project/filter expressions instead.
>
> *Note, I specify PROJECT_REWRITE specifically, because if we use the
> PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS transpose rule in the Volcano
> LOGICAL phase we get an infinite oscillation problem because
> PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS pushes Project below Filter,
> FILTER_PROJECT_TRANSPOSE can pull Filter back below/above that Project,
> and PROJECT_MERGE collapses the resulting projects back into the original
> shape, making the same rewrite applicable again. In our testing we left the
> transpose rule in LOGICAL phase as the current default, but still analyzing
> implications here.
> Still investigating further, but interested to hear the opinion / perspective
> from someone with more understanding on the Calcite planner / Flink table
> optimizations work.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)