friendlymatthew commented on code in PR #20822:
URL: https://github.com/apache/datafusion/pull/20822#discussion_r2906168627
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -1146,6 +1198,228 @@ mod test {
);
}
+ /// get_field(struct_col, 'a') on a struct with a primitive leaf should
allow pushdown.
+ #[test]
+ fn get_field_on_struct_allows_pushdown() {
+ let table_schema = Arc::new(Schema::new(vec![Field::new(
+ "struct_col",
+ DataType::Struct(
+ vec![Arc::new(Field::new("a", DataType::Int32, true))].into(),
+ ),
+ true,
+ )]));
+
+ // get_field(struct_col, 'a') > 5
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("struct_col"),
+ Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None),
+ ]);
+ let expr =
get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(5)), None));
+ let expr = logical2physical(&expr, &table_schema);
+
+ assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
+ }
+
+ /// get_field on a struct field that resolves to a nested type should
still block pushdown.
+ #[test]
+ fn get_field_on_nested_leaf_prevents_pushdown() {
+ let inner_struct = DataType::Struct(
+ vec![Arc::new(Field::new("x", DataType::Int32, true))].into(),
+ );
+ let table_schema = Arc::new(Schema::new(vec![Field::new(
+ "struct_col",
+ DataType::Struct(
+ vec![Arc::new(Field::new("nested", inner_struct,
true))].into(),
+ ),
+ true,
+ )]));
+
+ // get_field(struct_col, 'nested') IS NOT NULL — the leaf is still a
struct
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("struct_col"),
+ Expr::Literal(ScalarValue::Utf8(Some("nested".to_string())), None),
+ ]);
+ let expr = get_field_expr.is_not_null();
+ let expr = logical2physical(&expr, &table_schema);
+
+ assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
+ }
+
+ /// get_field on a struct produces correct Parquet leaf indices.
+ #[test]
+ fn get_field_filter_candidate_has_correct_leaf_indices() {
+ use arrow::array::{Int32Array, StringArray, StructArray};
+
+ // Schema: id (Int32), s (Struct{value: Int32, label: Utf8})
+ // Parquet leaves: id=0, s.value=1, s.label=2
+ let struct_fields: arrow::datatypes::Fields = vec![
+ Arc::new(Field::new("value", DataType::Int32, false)),
+ Arc::new(Field::new("label", DataType::Utf8, false)),
+ ]
+ .into();
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("s", DataType::Struct(struct_fields.clone()), false),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(StructArray::new(
+ struct_fields,
+ vec![
+ Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
+ Arc::new(StringArray::from(vec!["a", "b", "c"])) as _,
+ ],
+ None,
+ )),
+ ],
+ )
+ .unwrap();
+
+ let file = NamedTempFile::new().expect("temp file");
+ let mut writer =
+ ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema),
None)
+ .expect("writer");
+ writer.write(&batch).expect("write batch");
+ writer.close().expect("close writer");
+
+ let reader_file = file.reopen().expect("reopen file");
+ let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
+ .expect("reader builder");
+ let metadata = builder.metadata().clone();
+ let file_schema = builder.schema().clone();
+
+ // get_field(s, 'value') > 5
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("s"),
+ Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None),
+ ]);
+ let expr =
get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(5)), None));
+ let expr = logical2physical(&expr, &file_schema);
+
+ let candidate = FilterCandidateBuilder::new(expr, file_schema)
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("get_field filter on struct should be pushable");
+
+ // The root column is s (Arrow index 1), which expands to Parquet
+ // leaves 1 (s.value) and 2 (s.label).
+ assert_eq!(
+ candidate.projection.leaf_indices,
+ vec![1, 2],
+ "leaf_indices should contain both leaves of struct s"
+ );
+ }
+
+ /// Deeply nested get_field: get_field(struct_col, 'outer', 'inner') where
the
+ /// leaf is primitive should allow pushdown. The logical simplifier
flattens
+ /// nested get_field(get_field(col, 'a'), 'b') into get_field(col, 'a',
'b').
+ #[test]
+ fn get_field_deeply_nested_allows_pushdown() {
Review Comment:
Some proof that nested field access works
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -368,6 +394,31 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
type Node = Arc<dyn PhysicalExpr>;
fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
+ // check for get_field(Column("foo"), "bar", ...) accessing a struct
field
+ // the resolved return type tells us the leaf type
+ // if it's non0nested, we can pushdown by recording the root column
index and skip children
+ // this way, the visitor never sees the raw struct Column and rejects
it
+ if let Some(func) = node.as_any().downcast_ref::<ScalarFunctionExpr>()
+ && func.name() == "get_field"
Review Comment:
We specifically target `get_field` expressions here because it's the only
way to access struct fields in datafusion. Bracket notation, dot notation, and
the function form all compile to it
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -294,6 +303,23 @@ impl<'schema> PushdownChecker<'schema> {
}
}
+ fn check_struct_field_column(
Review Comment:
like `check_single_column`, but for struct columns accessed via `get_field`
We record the root column index without blocking on the struct type, since
the caller has already verified the accessed leaf is a primitive
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -1146,6 +1198,228 @@ mod test {
);
}
+ /// get_field(struct_col, 'a') on a struct with a primitive leaf should
allow pushdown.
+ #[test]
+ fn get_field_on_struct_allows_pushdown() {
+ let table_schema = Arc::new(Schema::new(vec![Field::new(
+ "struct_col",
+ DataType::Struct(
+ vec![Arc::new(Field::new("a", DataType::Int32, true))].into(),
+ ),
+ true,
+ )]));
+
+ // get_field(struct_col, 'a') > 5
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("struct_col"),
+ Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None),
+ ]);
+ let expr =
get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(5)), None));
+ let expr = logical2physical(&expr, &table_schema);
+
+ assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
+ }
+
+ /// get_field on a struct field that resolves to a nested type should
still block pushdown.
+ #[test]
Review Comment:
we don't allow all struct columns through because Parquet doesn't store
structs as a single column
i.e. we don't push `struct_col IS NOT NULL` down
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]