alamb commented on code in PR #20193:
URL: https://github.com/apache/datafusion/pull/20193#discussion_r2775061120
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -793,37 +793,27 @@ impl DataSource for FileScanConfig {
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
// Remap filter Column indices to match the table schema (file +
partition columns).
- // This is necessary because filters may have been created against a
different schema
- // (e.g., after projection pushdown) and need to be remapped to the
table schema
- // before being passed to the file source and ultimately serialized.
- // For example, the filter being pushed down is `c1_c2 > 5` and it was
created
- // against the output schema of the this `DataSource` which has
projection `c1 + c2 as c1_c2`.
- // Thus we need to rewrite the filter back to `c1 + c2 > 5` before
passing it to the file source.
+ // This is necessary because refer to the output shema of this
`DataSource`
+ // (e.g., after projection pushdown has been applied) and need to be
remapped to the table schema
+ // before being passed to the file source
+ //
+ // For example, consider a filter `c1_c2 > 5` being pushed down. If the
+ // `DataSource` has a projection `c1 + c2 as c1_c2`, the filter must
be rewritten
+ // to refer to the table schema `c1 + c2 > 5`
let table_schema = self.file_source.table_schema().table_schema();
- // If there's a projection with aliases, first map the filters back
through
- // the projection expressions before remapping to the table schema.
let filters_to_remap = if let Some(projection) =
self.file_source.projection() {
- use datafusion_physical_plan::projection::update_expr;
filters
.into_iter()
- .map(|filter| {
- update_expr(&filter, projection.as_ref(),
true)?.ok_or_else(|| {
- internal_datafusion_err!(
- "Failed to map filter expression through
projection: {}",
- filter
- )
- })
- })
+ .map(|filter| projection.unproject_expr(&filter))
Review Comment:
The whole point of the PR is to pull this function call into a method on
`ProjectionExprs` and add documentation
##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -842,7 +901,7 @@ pub fn combine_projections(
pub fn update_expr(
expr: &Arc<dyn PhysicalExpr>,
projected_exprs: &[ProjectionExpr],
- sync_with_child: bool,
+ unproject: bool,
Review Comment:
I found this parameter name to be **super** confusing as there are no
"children" involved.
`unproject` as the best I could come up with -- but would love some thoughts
on better names
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -793,37 +793,27 @@ impl DataSource for FileScanConfig {
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
// Remap filter Column indices to match the table schema (file +
partition columns).
- // This is necessary because filters may have been created against a
different schema
- // (e.g., after projection pushdown) and need to be remapped to the
table schema
- // before being passed to the file source and ultimately serialized.
- // For example, the filter being pushed down is `c1_c2 > 5` and it was
created
- // against the output schema of the this `DataSource` which has
projection `c1 + c2 as c1_c2`.
- // Thus we need to rewrite the filter back to `c1 + c2 > 5` before
passing it to the file source.
+ // This is necessary because refer to the output shema of this
`DataSource`
+ // (e.g., after projection pushdown has been applied) and need to be
remapped to the table schema
+ // before being passed to the file source
+ //
+ // For example, consider a filter `c1_c2 > 5` being pushed down. If the
+ // `DataSource` has a projection `c1 + c2 as c1_c2`, the filter must
be rewritten
+ // to refer to the table schema `c1 + c2 > 5`
let table_schema = self.file_source.table_schema().table_schema();
- // If there's a projection with aliases, first map the filters back
through
- // the projection expressions before remapping to the table schema.
let filters_to_remap = if let Some(projection) =
self.file_source.projection() {
- use datafusion_physical_plan::projection::update_expr;
filters
.into_iter()
- .map(|filter| {
- update_expr(&filter, projection.as_ref(),
true)?.ok_or_else(|| {
- internal_datafusion_err!(
- "Failed to map filter expression through
projection: {}",
- filter
- )
- })
- })
+ .map(|filter| projection.unproject_expr(&filter))
.collect::<Result<Vec<_>>>()?
} else {
filters
};
// Now remap column indices to match the table schema.
- let remapped_filters: Result<Vec<_>> = filters_to_remap
+ let remapped_filters = filters_to_remap
Review Comment:
driveby cleanup (no functional change)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]