itsjunetime commented on code in PR #12135:
URL: https://github.com/apache/datafusion/pull/12135#discussion_r1737065470
##########
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs:
##########
@@ -268,90 +259,183 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
- pub fn build(
- mut self,
- metadata: &ParquetMetaData,
- ) -> Result<Option<FilterCandidate>> {
- let expr = self.expr.clone().rewrite(&mut self).data()?;
-
- if self.non_primitive_columns || self.projected_columns {
- Ok(None)
- } else {
- let required_bytes =
- size_of_columns(&self.required_column_indices, metadata)?;
- let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
-
- Ok(Some(FilterCandidate {
- expr,
- required_bytes,
- can_use_index,
- projection: self.required_column_indices.into_iter().collect(),
- }))
+ pub fn build(self, metadata: &ParquetMetaData) ->
Result<Option<FilterCandidate>> {
+ let Some((projection, rewritten_expr)) = non_pushdown_columns(
+ Arc::clone(&self.expr),
+ self.file_schema,
+ self.table_schema,
+ )?
+ else {
+ return Ok(None);
+ };
+
+ let required_bytes = size_of_columns(&self.required_column_indices,
metadata)?;
+ let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
+
+ Ok(Some(FilterCandidate {
+ expr: rewritten_expr,
+ required_bytes,
+ can_use_index,
+ projection,
+ }))
+ }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree
structure to determine
+// if any column references in the expression would prevent it from being
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used
only once.
+struct PushdownChecker<'schema> {
+ /// Does the expression require any non-primitive columns (like structs)?
+ non_primitive_columns: bool,
+ /// Does the expression reference any columns that are in the table
+ /// schema but not in the file schema?
+ projected_columns: bool,
+ // the indices of all the columns found within the given expression which
exist inside the given
+ // [`file_schema`]
+ required_column_indices: BTreeSet<usize>,
+ file_schema: &'schema Schema,
+ table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+ fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
Review Comment:
I made this into an associated function so that it can be used by itself or
as a part of `PushdownChecker`'s tree node recursion functions. It sets some
things on the `PushdownChecker` that are needed to track projected columns, so
I guess we could make it a free function, but then it would still need to pass
in all the fields of `PushdownChecker` to do all it needs to do, thus negating
the benefits of being a free function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]