This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 23b88fbed7 Allow filters on struct fields to be pushed down into
Parquet scan (#20822)
23b88fbed7 is described below
commit 23b88fbed70309926851f6ceea1a490527850e10
Author: Matthew Kim <[email protected]>
AuthorDate: Tue Mar 10 09:43:18 2026 -0400
Allow filters on struct fields to be pushed down into Parquet scan (#20822)
## Which issue does this PR close?
- Related to #20603
## Rationale for this change
This PR enables Parquet row-level filter pushdown for struct field
access expressions, which previously fell back to a full scan followed
by a separate filtering pass, a significant perf penalty for queries
filtering on struct fields in large Parquet files (like Variant types!)
Filters on struct fields like `WHERE s['foo'] > 67` were not being
pushed into the Parquet decoder. This is because `PushdownChecker` sees
the underlying `Column("s")` has a `Struct` type and unconditionally
rejects it, without considering that `get_field` resolves to a primitive
leaf. With this change, deeply nested access like `s['outer']['inner']`
will also get pushed down because the logical simplifier flattens it
before it reaches the physical plan
Note: this does not address the projection side and should not be
blocked by it. `SELECT s['foo']` still reads the entire struct rather
than just the needed leaf column. That requires separate changes to how
the opener builds its projection mask.
---
Cargo.lock | 1 +
datafusion/datasource-parquet/Cargo.toml | 1 +
datafusion/datasource-parquet/src/row_filter.rs | 330 ++++++++++++++++++++-
.../test_files/projection_pushdown.slt | 24 +-
4 files changed, 336 insertions(+), 20 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 9c8f2c5935..efab800f22 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2059,6 +2059,7 @@ dependencies = [
"datafusion-datasource",
"datafusion-execution",
"datafusion-expr",
+ "datafusion-functions",
"datafusion-functions-aggregate-common",
"datafusion-functions-nested",
"datafusion-physical-expr",
diff --git a/datafusion/datasource-parquet/Cargo.toml
b/datafusion/datasource-parquet/Cargo.toml
index b865422366..4889059b16 100644
--- a/datafusion/datasource-parquet/Cargo.toml
+++ b/datafusion/datasource-parquet/Cargo.toml
@@ -39,6 +39,7 @@ datafusion-common-runtime = { workspace = true }
datafusion-datasource = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
+datafusion-functions = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-adapter = { workspace = true }
diff --git a/datafusion/datasource-parquet/src/row_filter.rs
b/datafusion/datasource-parquet/src/row_filter.rs
index 62ba53bb87..0c0d516aea 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -60,9 +60,15 @@
//! still be sorted by size.
//!
//! List-aware predicates (for example, `array_has`, `array_has_all`, and
-//! `array_has_any`) can be evaluated directly during Parquet decoding. Struct
-//! columns and other nested projections that are not explicitly supported will
-//! continue to be evaluated after the batches are materialized.
+//! `array_has_any`) can be evaluated directly during Parquet decoding.
+//! Struct field access via `get_field` is also supported when the accessed
+//! leaf is a primitive type. Filters that reference entire struct columns
+//! rather than individual fields cannot be pushed down and are instead
+//! evaluated after the full batches are materialized.
+//!
+//! For example, given a struct column `s {name: Utf8, value: Int32}`:
+//! - `WHERE s['value'] > 5` — pushed down (accesses a primitive leaf)
+//! - `WHERE s IS NOT NULL` — not pushed down (references the whole struct)
use std::cmp::Ordering;
use std::collections::BTreeSet;
@@ -72,6 +78,7 @@ use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
+use datafusion_functions::core::getfield::GetFieldFunc;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::file::metadata::ParquetMetaData;
@@ -80,6 +87,7 @@ use parquet::schema::types::SchemaDescriptor;
use datafusion_common::Result;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion,
TreeNodeVisitor};
+use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
@@ -99,8 +107,11 @@ use super::supported_predicates::supports_list_predicates;
/// An expression can be evaluated as a `DatafusionArrowPredicate` if it:
/// * Does not reference any projected columns
/// * References either primitive columns or list columns used by
-/// supported predicates (such as `array_has_all` or NULL checks). Struct
-/// columns are still evaluated after decoding.
+/// supported predicates (such as `array_has_all` or NULL checks).
+/// * References struct fields via `get_field` where the accessed leaf
+/// is a primitive type (e.g. `get_field(struct_col, 'field') > 5`).
+/// Direct references to whole struct columns are still evaluated after
+/// decoding.
#[derive(Debug)]
pub(crate) struct DatafusionArrowPredicate {
/// the filter expression
@@ -267,9 +278,12 @@ impl FilterCandidateBuilder {
/// prevent the expression from being pushed down to the parquet decoder.
///
/// An expression cannot be pushed down if it references:
-/// - Unsupported nested columns (structs or list fields that are not covered
by
-/// the supported predicate set)
+/// - Unsupported nested columns (whole struct references or list fields that
are
+/// not covered by the supported predicate set)
/// - Columns that don't exist in the file schema
+///
+/// Struct field access via `get_field` is supported when the resolved leaf
type
+/// is primitive (e.g. `get_field(struct_col, 'field') > 5`).
struct PushdownChecker<'schema> {
/// Does the expression require any non-primitive columns (like structs)?
non_primitive_columns: bool,
@@ -294,6 +308,42 @@ impl<'schema> PushdownChecker<'schema> {
}
}
+ /// Checks whether a struct's root column exists in the file schema and,
if so,
+ /// records its index so the entire struct is decoded for filter
evaluation.
+ ///
+ /// This is called when we see a `get_field` expression that resolves to a
+ /// primitive leaf type. We only need the *root* column index because the
+ /// Parquet reader decodes all leaves of a struct together.
+ ///
+ /// # Example
+ ///
+ /// Given file schema `{a: Int32, s: Struct(foo: Utf8, bar: Int64)}` and
the
+ /// expression `get_field(s, 'foo') = 'hello'`:
+ ///
+ /// - `column_name` = `"s"` (the root struct column)
+ /// - `file_schema.index_of("s")` returns `1`
+ /// - We push `1` into `required_columns`
+ /// - Return `None` (no issue — traversal continues in the caller)
+ ///
+ /// If `"s"` is not in the file schema (e.g. a projected-away column), we
set
+ /// `projected_columns = true` and return `Jump` to skip the subtree.
+ fn check_struct_field_column(
+ &mut self,
+ column_name: &str,
+ ) -> Option<TreeNodeRecursion> {
+ let idx = match self.file_schema.index_of(column_name) {
+ Ok(idx) => idx,
+ Err(_) => {
+ self.projected_columns = true;
+ return Some(TreeNodeRecursion::Jump);
+ }
+ };
+
+ self.required_columns.push(idx);
+
+ None
+ }
+
fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
let idx = match self.file_schema.index_of(column_name) {
Ok(idx) => idx,
@@ -368,6 +418,47 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
type Node = Arc<dyn PhysicalExpr>;
fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
+ // Handle struct field access like `s['foo']['bar'] > 10`.
+ //
+ // DataFusion represents nested field access as
`get_field(Column("s"), "foo")`
+ // (or chained: `get_field(get_field(Column("s"), "foo"), "bar")`).
+ //
+ // We intercept the outermost `get_field` on the way *down* the tree so
+ // the visitor never reaches the raw `Column("s")` node. Without this,
+ // `check_single_column` would see that `s` is a Struct and reject it.
+ //
+ // The strategy:
+ // 1. Match `get_field` whose first arg is a `Column` (the struct
root).
+ // 2. Check that the *resolved* return type is primitive — meaning
we've
+ // drilled all the way to a leaf (e.g. `s['foo']` → Utf8).
+ // 3. Record the root column index via `check_struct_field_column`
and
+ // return `Jump` to skip visiting the children (the Column and the
+ // literal field-name args), since we've already handled them.
+ //
+ // If the return type is still nested (e.g. `s['nested_struct']` →
Struct),
+ // we fall through and let normal traversal continue, which will
+ // eventually reject the expression when it hits the struct Column.
+ if let Some(func) =
+
ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(node.as_ref())
+ {
+ let args = func.args();
+
+ if let Some(column) = args
+ .first()
+ .and_then(|a| a.as_any().downcast_ref::<Column>())
+ {
+ let return_type = func.return_type();
+ if !DataType::is_nested(return_type) {
+ if let Some(recursion) =
self.check_struct_field_column(column.name())
+ {
+ return Ok(recursion);
+ }
+
+ return Ok(TreeNodeRecursion::Jump);
+ }
+ }
+ }
+
if let Some(column) = node.as_any().downcast_ref::<Column>()
&& let Some(recursion) = self.check_single_column(column.name())
{
@@ -434,7 +525,8 @@ fn leaf_indices_for_roots(
/// - Exist in the provided schema
/// - Are primitive types OR list columns with supported predicates
/// (e.g., `array_has`, `array_has_all`, `array_has_any`, IS NULL, IS NOT
NULL)
-/// - Struct columns are not supported and will prevent pushdown
+/// - Are struct columns accessed via `get_field` where the leaf type is
primitive
+/// - Direct references to whole struct columns will prevent pushdown
///
/// # Arguments
/// * `expr` - The filter expression to check
@@ -1146,6 +1238,228 @@ mod test {
);
}
+ /// get_field(struct_col, 'a') on a struct with a primitive leaf should
allow pushdown.
+ #[test]
+ fn get_field_on_struct_allows_pushdown() {
+ let table_schema = Arc::new(Schema::new(vec![Field::new(
+ "struct_col",
+ DataType::Struct(
+ vec![Arc::new(Field::new("a", DataType::Int32, true))].into(),
+ ),
+ true,
+ )]));
+
+ // get_field(struct_col, 'a') > 5
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("struct_col"),
+ Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None),
+ ]);
+ let expr =
get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(5)), None));
+ let expr = logical2physical(&expr, &table_schema);
+
+ assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
+ }
+
+ /// get_field on a struct field that resolves to a nested type should
still block pushdown.
+ #[test]
+ fn get_field_on_nested_leaf_prevents_pushdown() {
+ let inner_struct = DataType::Struct(
+ vec![Arc::new(Field::new("x", DataType::Int32, true))].into(),
+ );
+ let table_schema = Arc::new(Schema::new(vec![Field::new(
+ "struct_col",
+ DataType::Struct(
+ vec![Arc::new(Field::new("nested", inner_struct,
true))].into(),
+ ),
+ true,
+ )]));
+
+ // get_field(struct_col, 'nested') IS NOT NULL — the leaf is still a
struct
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("struct_col"),
+ Expr::Literal(ScalarValue::Utf8(Some("nested".to_string())), None),
+ ]);
+ let expr = get_field_expr.is_not_null();
+ let expr = logical2physical(&expr, &table_schema);
+
+ assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
+ }
+
+ /// get_field on a struct produces correct Parquet leaf indices.
+ #[test]
+ fn get_field_filter_candidate_has_correct_leaf_indices() {
+ use arrow::array::{Int32Array, StringArray, StructArray};
+
+ // Schema: id (Int32), s (Struct{value: Int32, label: Utf8})
+ // Parquet leaves: id=0, s.value=1, s.label=2
+ let struct_fields: arrow::datatypes::Fields = vec![
+ Arc::new(Field::new("value", DataType::Int32, false)),
+ Arc::new(Field::new("label", DataType::Utf8, false)),
+ ]
+ .into();
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("s", DataType::Struct(struct_fields.clone()), false),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(StructArray::new(
+ struct_fields,
+ vec![
+ Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
+ Arc::new(StringArray::from(vec!["a", "b", "c"])) as _,
+ ],
+ None,
+ )),
+ ],
+ )
+ .unwrap();
+
+ let file = NamedTempFile::new().expect("temp file");
+ let mut writer =
+ ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema),
None)
+ .expect("writer");
+ writer.write(&batch).expect("write batch");
+ writer.close().expect("close writer");
+
+ let reader_file = file.reopen().expect("reopen file");
+ let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
+ .expect("reader builder");
+ let metadata = builder.metadata().clone();
+ let file_schema = builder.schema().clone();
+
+ // get_field(s, 'value') > 5
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("s"),
+ Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None),
+ ]);
+ let expr =
get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(5)), None));
+ let expr = logical2physical(&expr, &file_schema);
+
+ let candidate = FilterCandidateBuilder::new(expr, file_schema)
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("get_field filter on struct should be pushable");
+
+ // The root column is s (Arrow index 1), which expands to Parquet
+ // leaves 1 (s.value) and 2 (s.label).
+ assert_eq!(
+ candidate.projection.leaf_indices,
+ vec![1, 2],
+ "leaf_indices should contain both leaves of struct s"
+ );
+ }
+
+ /// Deeply nested get_field: get_field(struct_col, 'outer', 'inner') where
the
+ /// leaf is primitive should allow pushdown. The logical simplifier
flattens
+ /// nested get_field(get_field(col, 'a'), 'b') into get_field(col, 'a',
'b').
+ #[test]
+ fn get_field_deeply_nested_allows_pushdown() {
+ let table_schema = Arc::new(Schema::new(vec![Field::new(
+ "s",
+ DataType::Struct(
+ vec![Arc::new(Field::new(
+ "outer",
+ DataType::Struct(
+ vec![Arc::new(Field::new("inner", DataType::Int32,
true))].into(),
+ ),
+ true,
+ ))]
+ .into(),
+ ),
+ true,
+ )]));
+
+ // s['outer']['inner'] > 5
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("s"),
+ Expr::Literal(ScalarValue::Utf8(Some("outer".to_string())), None),
+ Expr::Literal(ScalarValue::Utf8(Some("inner".to_string())), None),
+ ]);
+ let expr =
get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(5)), None));
+ let expr = logical2physical(&expr, &table_schema);
+
+ assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
+ }
+
+ /// End-to-end: deeply nested get_field filter produces correct leaf
indices
+ /// and the filter actually works against a Parquet file.
+ #[test]
+ fn get_field_deeply_nested_filter_candidate() {
+ use arrow::array::{Int32Array, StructArray};
+
+ // Schema: id (Int32), s (Struct{outer: Struct{inner: Int32}})
+ // Parquet leaves: id=0, s.outer.inner=1
+ let inner_fields: arrow::datatypes::Fields =
+ vec![Arc::new(Field::new("inner", DataType::Int32, false))].into();
+ let outer_fields: arrow::datatypes::Fields = vec![Arc::new(Field::new(
+ "outer",
+ DataType::Struct(inner_fields.clone()),
+ false,
+ ))]
+ .into();
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("s", DataType::Struct(outer_fields.clone()), false),
+ ]));
+
+ let inner_struct = StructArray::new(
+ inner_fields,
+ vec![Arc::new(Int32Array::from(vec![10, 20, 30])) as _],
+ None,
+ );
+ let outer_struct =
+ StructArray::new(outer_fields, vec![Arc::new(inner_struct) as _],
None);
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(outer_struct),
+ ],
+ )
+ .unwrap();
+
+ let file = NamedTempFile::new().expect("temp file");
+ let mut writer =
+ ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema),
None)
+ .expect("writer");
+ writer.write(&batch).expect("write batch");
+ writer.close().expect("close writer");
+
+ let reader_file = file.reopen().expect("reopen file");
+ let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
+ .expect("reader builder");
+ let metadata = builder.metadata().clone();
+ let file_schema = builder.schema().clone();
+
+ // Parquet should have 2 leaves: id=0, s.outer.inner=1
+ assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 2);
+
+ // get_field(s, 'outer', 'inner') > 15
+ let get_field_expr = datafusion_functions::core::get_field().call(vec![
+ col("s"),
+ Expr::Literal(ScalarValue::Utf8(Some("outer".to_string())), None),
+ Expr::Literal(ScalarValue::Utf8(Some("inner".to_string())), None),
+ ]);
+ let expr =
get_field_expr.gt(Expr::Literal(ScalarValue::Int32(Some(15)), None));
+ let expr = logical2physical(&expr, &file_schema);
+
+ let candidate = FilterCandidateBuilder::new(expr, file_schema)
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("deeply nested get_field filter should be pushable");
+
+ // Root column is s (Arrow index 1), which has one leaf:
s.outer.inner=1
+ assert_eq!(
+ candidate.projection.leaf_indices,
+ vec![1],
+ "leaf_indices should be [1] for s.outer.inner"
+ );
+ }
+
/// Sanity check that the given expression could be evaluated against the
given schema without any errors.
/// This will fail if the expression references columns that are not in
the schema or if the types of the columns are incompatible, etc.
fn check_expression_can_evaluate_against_schema(
diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt
b/datafusion/sqllogictest/test_files/projection_pushdown.slt
index 1c89923080..0161bf4118 100644
--- a/datafusion/sqllogictest/test_files/projection_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt
@@ -292,7 +292,7 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[id@0 as id, __datafusion_extracted_2@1 as
simple_struct.s[label]]
02)--FilterExec: __datafusion_extracted_1@0 > 150, projection=[id@1,
__datafusion_extracted_2@2]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id,
get_field(s@1, label) as __datafusion_extracted_2], file_type=parquet
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id,
get_field(s@1, label) as __datafusion_extracted_2], file_type=parquet,
predicate=get_field(s@1, value) > 150
# Verify correctness
query IT
@@ -595,7 +595,7 @@ physical_plan
01)SortExec: TopK(fetch=2), expr=[simple_struct.s[value]@1 ASC NULLS LAST],
preserve_partitioning=[false]
02)--ProjectionExec: expr=[id@1 as id, __datafusion_extracted_1@0 as
simple_struct.s[value]]
03)----FilterExec: id@1 > 1
-04)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet, predicate=id@0 > 1, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 1, required_guarantees=[]
+04)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ],
pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1,
required_guarantees=[]
# Verify correctness
query II
@@ -848,7 +848,7 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[id@0 as id, __datafusion_extracted_2@1 as
nullable_struct.s[label]]
02)--FilterExec: __datafusion_extracted_1@0 IS NOT NULL, projection=[id@1,
__datafusion_extracted_2@2]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/nullable.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id,
get_field(s@1, label) as __datafusion_extracted_2], file_type=parquet
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/nullable.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id,
get_field(s@1, label) as __datafusion_extracted_2], file_type=parquet,
predicate=get_field(s@1, value) IS NOT NULL
# Verify correctness
query IT
@@ -1223,7 +1223,7 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[__datafusion_extracted_2@0 as simple_struct.s[value]]
02)--FilterExec: character_length(__datafusion_extracted_1@0) > 4,
projection=[__datafusion_extracted_2@1]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, label) as __datafusion_extracted_1, get_field(s@1,
value) as __datafusion_extracted_2], file_type=parquet
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, label) as __datafusion_extracted_1, get_field(s@1,
value) as __datafusion_extracted_2], file_type=parquet,
predicate=character_length(get_field(s@1, label)) > 4
# Verify correctness - filter on rows where label length > 4 (all have length
5, except 'one' has 3)
# Wait, from the data: alpha(5), beta(4), gamma(5), delta(5), epsilon(7)
@@ -1314,7 +1314,7 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[id@0 as id]
02)--SortExec: TopK(fetch=2), expr=[__datafusion_extracted_1@1 ASC NULLS
LAST], preserve_partitioning=[false]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as __datafusion_extracted_1],
file_type=parquet
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, get_field(s@1, value) as __datafusion_extracted_1],
file_type=parquet, predicate=DynamicFilter [ empty ]
# Verify correctness
query I
@@ -1421,7 +1421,7 @@ logical_plan
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(__datafusion_extracted_1@0, __datafusion_extracted_2 * Int64(10)@2)],
projection=[id@1, id@3]
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet
-03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[get_field(s@1, level) as __datafusion_extracted_2, id,
get_field(s@1, level) * 10 as __datafusion_extracted_2 * Int64(10)],
file_type=parquet
+03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[get_field(s@1, level) as __datafusion_extracted_2, id,
get_field(s@1, level) * 10 as __datafusion_extracted_2 * Int64(10)],
file_type=parquet, predicate=DynamicFilter [ empty ]
# Verify correctness - value = level * 10
# simple_struct: (1,100), (2,200), (3,150), (4,300), (5,250)
@@ -1456,7 +1456,7 @@ logical_plan
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)]
02)--FilterExec: __datafusion_extracted_1@0 > 150, projection=[id@1]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet, predicate=get_field(s@1, value) > 150
04)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
# Verify correctness - id matches and value > 150
@@ -1495,9 +1495,9 @@ logical_plan
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)]
02)--FilterExec: __datafusion_extracted_1@0 > 100, projection=[id@1]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet, predicate=get_field(s@1, value) > 100
04)--FilterExec: __datafusion_extracted_2@0 > 3, projection=[id@1]
-05)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[get_field(s@1, level) as __datafusion_extracted_2, id],
file_type=parquet, predicate=DynamicFilter [ empty ]
+05)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[get_field(s@1, level) as __datafusion_extracted_2, id],
file_type=parquet, predicate=get_field(s@1, level) > 3 AND DynamicFilter [
empty ]
# Verify correctness - id matches, value > 100, and level > 3
# Matching ids where value > 100: 2(200), 3(150), 4(300), 5(250)
@@ -1604,7 +1604,7 @@ physical_plan
02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@1, id@0)],
projection=[__datafusion_extracted_2@0, id@1, __datafusion_extracted_3@3]
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_2, id],
file_type=parquet
04)----FilterExec: __datafusion_extracted_1@0 > 5, projection=[id@1,
__datafusion_extracted_3@2]
-05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[get_field(s@1, level) as __datafusion_extracted_1, id,
get_field(s@1, level) as __datafusion_extracted_3], file_type=parquet,
predicate=DynamicFilter [ empty ]
+05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[get_field(s@1, level) as __datafusion_extracted_1, id,
get_field(s@1, level) as __datafusion_extracted_3], file_type=parquet,
predicate=get_field(s@1, level) > 5 AND DynamicFilter [ empty ]
# Verify correctness - left join with level > 5 condition
# Only join_right rows with level > 5 are matched: id=1 (level=10), id=4
(level=8)
@@ -1735,7 +1735,7 @@ logical_plan
05)--------TableScan: simple_struct projection=[id, s],
partial_filters=[get_field(simple_struct.s, Utf8("value")) > Int64(200)]
physical_plan
01)FilterExec: __datafusion_extracted_1@0 > 200, projection=[id@1]
-02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet, predicate=get_field(s@1, value) > 200
# Verify correctness
query I
@@ -1981,7 +1981,7 @@ logical_plan
04)------TableScan: t projection=[s], partial_filters=[CASE WHEN
get_field(t.s, Utf8("f1")) IS NOT NULL THEN get_field(t.s, Utf8("f1")) ELSE
get_field(t.s, Utf8("f2")) END = Int64(1)]
physical_plan
01)FilterExec: CASE WHEN __datafusion_extracted_3@0 IS NOT NULL THEN
__datafusion_extracted_3@0 ELSE __datafusion_extracted_4@1 END = 1,
projection=[__datafusion_extracted_2@2]
-02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/test.parquet]]},
projection=[get_field(s@0, f1) as __datafusion_extracted_3, get_field(s@0, f2)
as __datafusion_extracted_4, get_field(s@0, f1) as __datafusion_extracted_2],
file_type=parquet
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/test.parquet]]},
projection=[get_field(s@0, f1) as __datafusion_extracted_3, get_field(s@0, f2)
as __datafusion_extracted_4, get_field(s@0, f1) as __datafusion_extracted_2],
file_type=parquet, predicate=CASE WHEN get_field(s@0, f1) IS NOT NULL THEN
get_field(s@0, f1) ELSE get_field(s@0, f2) END = 1
query I
SELECT
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]