This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-49
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-49 by this push:
     new ee28aa7673 [branch-49] Backport #17129 to branch 49 (#17143)
ee28aa7673 is described below

commit ee28aa7673db2234b87117d664559e5857ac8c38
Author: Adam Gutglick <a...@spiraldb.com>
AuthorDate: Tue Aug 12 15:23:33 2025 +0100

    [branch-49] Backport #17129 to branch 49 (#17143)
    
    * Preserve equivalence properties during projection pushdown (#17129)
    
    * Adds parquet data diffs
    
    ---------
    
    Co-authored-by: Matthew Kim 
<38759997+friendlymatt...@users.noreply.github.com>
---
 datafusion/datasource/src/source.rs                |  35 ++++++++++++++++++++-
 datafusion/sqllogictest/data/1.parquet             | Bin 0 -> 1381 bytes
 datafusion/sqllogictest/data/2.parquet             | Bin 0 -> 1403 bytes
 .../test_files/parquet_filter_pushdown.slt         |  32 +++++++++++++++++++
 4 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/datafusion/datasource/src/source.rs 
b/datafusion/datasource/src/source.rs
index fde1944ae0..3a7ff1ef09 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -22,6 +22,7 @@ use std::fmt;
 use std::fmt::{Debug, Formatter};
 use std::sync::Arc;
 
+use datafusion_physical_expr::equivalence::ProjectionMapping;
 use datafusion_physical_plan::execution_plan::{
     Boundedness, EmissionType, SchedulingType,
 };
@@ -324,7 +325,39 @@ impl ExecutionPlan for DataSourceExec {
         &self,
         projection: &ProjectionExec,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        self.data_source.try_swapping_with_projection(projection)
+        match self.data_source.try_swapping_with_projection(projection)? {
+            Some(new_plan) => {
+                if let Some(new_data_source_exec) =
+                    new_plan.as_any().downcast_ref::<DataSourceExec>()
+                {
+                    let projection_mapping = ProjectionMapping::try_new(
+                        projection.expr().iter().cloned(),
+                        &self.schema(),
+                    )?;
+
+                    // Project the equivalence properties to the new schema
+                    let projected_eq_properties = self
+                        .cache
+                        .eq_properties
+                        .project(&projection_mapping, 
new_data_source_exec.schema());
+
+                    let preserved_exec = DataSourceExec {
+                        data_source: 
Arc::clone(&new_data_source_exec.data_source),
+                        cache: PlanProperties::new(
+                            projected_eq_properties,
+                            new_data_source_exec.cache.partitioning.clone(),
+                            new_data_source_exec.cache.emission_type,
+                            new_data_source_exec.cache.boundedness,
+                        )
+                        
.with_scheduling_type(new_data_source_exec.cache.scheduling_type),
+                    };
+                    Ok(Some(Arc::new(preserved_exec)))
+                } else {
+                    Ok(Some(new_plan))
+                }
+            }
+            None => Ok(None),
+        }
     }
 
     fn handle_child_pushdown_result(
diff --git a/datafusion/sqllogictest/data/1.parquet 
b/datafusion/sqllogictest/data/1.parquet
new file mode 100644
index 0000000000..a04f669eae
Binary files /dev/null and b/datafusion/sqllogictest/data/1.parquet differ
diff --git a/datafusion/sqllogictest/data/2.parquet 
b/datafusion/sqllogictest/data/2.parquet
new file mode 100644
index 0000000000..b5e29f81ba
Binary files /dev/null and b/datafusion/sqllogictest/data/2.parquet differ
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt 
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index 24e76a570c..61f4d6fc12 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -528,3 +528,35 @@ query TT
 select val, part from t_pushdown where part = val AND part = 'a';
 ----
 a a
+
+statement ok
+COPY (
+    SELECT
+        '00000000000000000000000000000001' AS trace_id,
+        '2023-10-01 00:00:00'::timestamptz AS start_timestamp,
+        'prod' as deployment_environment
+)
+TO 'data/1.parquet';
+
+statement ok
+COPY (
+    SELECT
+        '00000000000000000000000000000002' AS trace_id,
+        '2024-10-01 00:00:00'::timestamptz AS start_timestamp,
+        'staging' as deployment_environment
+)
+TO 'data/2.parquet';
+
+statement ok
+CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'data/';
+
+statement ok
+SET datafusion.execution.parquet.pushdown_filters = true;
+
+query T
+SELECT deployment_environment
+FROM t1
+WHERE trace_id = '00000000000000000000000000000002'
+ORDER BY start_timestamp, trace_id;
+----
+staging


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to