adriangb commented on code in PR #20854:
URL: https://github.com/apache/datafusion/pull/20854#discussion_r2919587066


##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -330,16 +345,17 @@ impl<'schema> PushdownChecker<'schema> {
     fn check_struct_field_column(
         &mut self,
         column_name: &str,
+        field_path: Vec<String>,
     ) -> Option<TreeNodeRecursion> {
-        let idx = match self.file_schema.index_of(column_name) {
-            Ok(idx) => idx,
-            Err(_) => {
-                self.projected_columns = true;
-                return Some(TreeNodeRecursion::Jump);
-            }
+        let Ok(idx) = self.file_schema.index_of(column_name) else {
+            self.projected_columns = true;
+            return Some(TreeNodeRecursion::Jump);

Review Comment:
   I know this is pre existing, but what is this code path supposed to catch? 
Under what circumstances would there be a column that doesn't exist in the file 
schema at this point, and why is it a "projected" column?



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -478,8 +505,24 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
 struct PushdownColumns {
     /// Sorted, unique column indices into the file schema required to evaluate
     /// the filter expression. Must be in ascending order for correct schema
-    /// projection matching.
+    /// projection matching. Does not include struct columns accessed via 
`get_field`.
     required_columns: Vec<usize>,
+    /// Struct field accesses via `get_field`. Each entry records the root 
struct
+    /// column index and the field path being accessed.
+    struct_field_accesses: Vec<StructFieldAccess>,
+}
+
+/// Records a struct field access via `get_field(struct_col, 'field1', 
'field2', ...)`.
+///
+/// This allows the row filter to project only the specific Parquet leaf 
columns
+/// needed by the filter, rather than all leaves of the struct.
+#[derive(Debug, Clone)]
+struct StructFieldAccess {
+    /// Arrow root column index of the struct in the file schema.
+    root_index: usize,
+    /// Field names forming the path into the struct.
+    /// e.g., `["value"]` for `s['value']`, `["outer", "inner"]` for 
`s['outer']['inner']`.

Review Comment:
   I assume we don't support stuff like `array_has_any(get_field(s, 'items'), 
5)`?



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -519,6 +562,149 @@ fn leaf_indices_for_roots(
         .collect()
 }
 
+/// Resolves struct field access to specific Parquet leaf column indices
+///
+/// For every `StructFieldAccess`, finds the leaf columns in the Parquet schema
+/// whose path matches the struct root name + field path. This avoids reading 
all
+/// leaves of a struct when only specific fields are needed
+fn resolve_struct_field_leaves(
+    accesses: &[StructFieldAccess],
+    file_schema: &Schema,
+    schema_descr: &SchemaDescriptor,
+) -> Vec<usize> {

Review Comment:
   Let's make sure we share this logic for projections.
   
   More generally: there should be a single place where there is a function 
along the lines of:
   
   ```rust
   fn build_parquet_read_plan(expr: &Arc<dyn PhysicalExpr>) -> ParquetReadPlan {
     ...
   }
   
   struct ParquetReadPlan {
      // leaf projections
      projection_mask: ProjectionMask,
      // the schema to read back with
      schema: SchemaRef,
      // the transformed expression (do we need this?)
      expr: Arc<dyn PhysicalExpr>
   }
   ```



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -519,6 +562,149 @@ fn leaf_indices_for_roots(
         .collect()
 }
 
+/// Resolves struct field access to specific Parquet leaf column indices
+///
+/// For every `StructFieldAccess`, finds the leaf columns in the Parquet schema
+/// whose path matches the struct root name + field path. This avoids reading 
all
+/// leaves of a struct when only specific fields are needed
+fn resolve_struct_field_leaves(
+    accesses: &[StructFieldAccess],
+    file_schema: &Schema,
+    schema_descr: &SchemaDescriptor,
+) -> Vec<usize> {
+    let mut leaf_indices = Vec::new();
+
+    for access in accesses {
+        let root_name = file_schema.field(access.root_index).name();
+        let prefix = std::iter::once(root_name.as_str())
+            .chain(access.field_path.iter().map(|p| p.as_str()))
+            .collect::<Vec<_>>();
+
+        for leaf_idx in 0..schema_descr.num_columns() {
+            let col = schema_descr.column(leaf_idx);
+            let col_path = col.path().parts();
+
+            // A leaf matches if its path starts with our prefix.
+            // e.g., prefix=["s", "value"] matches leaf path ["s", "value"]
+            // prefix=["s", "outer"] matches ["s", "outer", "inner"]
+
+            // a leaf matches iff its path starts with our prefix
+            // for example: prefix=["s", "value"] matches leaf path ["s", 
"value"]
+            //              prefix=["s", "outer"] matches ["s", "outer", 
"inner"]
+            let leaf_matches_path = col_path.len() >= prefix.len()
+                && col_path.iter().zip(prefix.iter()).all(|(a, b)| a == b);
+
+            if leaf_matches_path {
+                leaf_indices.push(leaf_idx);
+            }
+        }
+    }
+
+    leaf_indices
+}
+
+/// Builds a filter schema that includes only the fields actually accessed by 
the
+/// filter expression.
+///
+/// For regular (non-struct) columns, the full field type is used.
+/// For struct columns accessed via `get_field`, a pruned struct type is 
created

Review Comment:
   Instead of a pruned struct type, why not transform the expression from 
`get_field(s, 'f')` -> Alias(Column("s.f", 123), "get_field(s, 'f')")` or 
something like that? Then we don't need to manipulate the data (assemble a 
struct with pruned fields)



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -449,7 +466,17 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
             {
                 let return_type = func.return_type();
                 if !DataType::is_nested(return_type) {
-                    if let Some(recursion) = 
self.check_struct_field_column(column.name())
+                    let field_path = args[1..]
+                        .iter()
+                        .filter_map(|arg| {
+                            
arg.as_any().downcast_ref::<Literal>().and_then(|lit| {
+                                lit.value().try_as_str().flatten().map(|s| 
s.to_string())
+                            })
+                        })
+                        .collect();

Review Comment:
   Safer behavior would be to fall back to reading the entire struct root and 
passing that to the `get_field` UDF



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to