This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch branch-49 in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-49 by this push: new ee28aa7673 [branch-49] Backport #17129 to branch 49 (#17143) ee28aa7673 is described below commit ee28aa7673db2234b87117d664559e5857ac8c38 Author: Adam Gutglick <a...@spiraldb.com> AuthorDate: Tue Aug 12 15:23:33 2025 +0100 [branch-49] Backport #17129 to branch 49 (#17143) * Preserve equivalence properties during projection pushdown (#17129) * Adds parquet data diffs --------- Co-authored-by: Matthew Kim <38759997+friendlymatt...@users.noreply.github.com> --- datafusion/datasource/src/source.rs | 35 ++++++++++++++++++++- datafusion/sqllogictest/data/1.parquet | Bin 0 -> 1381 bytes datafusion/sqllogictest/data/2.parquet | Bin 0 -> 1403 bytes .../test_files/parquet_filter_pushdown.slt | 32 +++++++++++++++++++ 4 files changed, 66 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index fde1944ae0..3a7ff1ef09 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -22,6 +22,7 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_plan::execution_plan::{ Boundedness, EmissionType, SchedulingType, }; @@ -324,7 +325,39 @@ impl ExecutionPlan for DataSourceExec { &self, projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>> { - self.data_source.try_swapping_with_projection(projection) + match self.data_source.try_swapping_with_projection(projection)? { + Some(new_plan) => { + if let Some(new_data_source_exec) = + new_plan.as_any().downcast_ref::<DataSourceExec>() + { + let projection_mapping = ProjectionMapping::try_new( + projection.expr().iter().cloned(), + &self.schema(), + )?; + + // Project the equivalence properties to the new schema + let projected_eq_properties = self + .cache + .eq_properties + .project(&projection_mapping, new_data_source_exec.schema()); + + let preserved_exec = DataSourceExec { + data_source: Arc::clone(&new_data_source_exec.data_source), + cache: PlanProperties::new( + projected_eq_properties, + new_data_source_exec.cache.partitioning.clone(), + new_data_source_exec.cache.emission_type, + new_data_source_exec.cache.boundedness, + ) + .with_scheduling_type(new_data_source_exec.cache.scheduling_type), + }; + Ok(Some(Arc::new(preserved_exec))) + } else { + Ok(Some(new_plan)) + } + } + None => Ok(None), + } } fn handle_child_pushdown_result( diff --git a/datafusion/sqllogictest/data/1.parquet b/datafusion/sqllogictest/data/1.parquet new file mode 100644 index 0000000000..a04f669eae Binary files /dev/null and b/datafusion/sqllogictest/data/1.parquet differ diff --git a/datafusion/sqllogictest/data/2.parquet b/datafusion/sqllogictest/data/2.parquet new file mode 100644 index 0000000000..b5e29f81ba Binary files /dev/null and b/datafusion/sqllogictest/data/2.parquet differ diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 24e76a570c..61f4d6fc12 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -528,3 +528,35 @@ query TT select val, part from t_pushdown where part = val AND part = 'a'; ---- a a + +statement ok +COPY ( + SELECT + '00000000000000000000000000000001' AS trace_id, + '2023-10-01 00:00:00'::timestamptz AS start_timestamp, + 'prod' as deployment_environment +) +TO 'data/1.parquet'; + +statement ok +COPY ( + SELECT + '00000000000000000000000000000002' AS trace_id, + '2024-10-01 00:00:00'::timestamptz AS start_timestamp, + 'staging' as deployment_environment +) +TO 'data/2.parquet'; + +statement ok +CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'data/'; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +query T +SELECT deployment_environment +FROM t1 +WHERE trace_id = '00000000000000000000000000000002' +ORDER BY start_timestamp, trace_id; +---- +staging --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org