This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 62f214fede fix: ignore non-existent columns when adding filter 
equivalence info in `FileScanConfig` (#17546)
62f214fede is described below

commit 62f214fede28070519fe0494d0ec34d4f296a92a
Author: Rohan Krishnaswamy <47869999+rkris...@users.noreply.github.com>
AuthorDate: Mon Sep 15 20:10:56 2025 -0700

    fix: ignore non-existent columns when adding filter equivalence info in 
`FileScanConfig` (#17546)
---
 datafusion/datasource/src/file_scan_config.rs | 73 ++++++++++++++++++++++++++-
 datafusion/datasource/src/test_util.rs        | 12 +++++
 2 files changed, 83 insertions(+), 2 deletions(-)

diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index 82d7057a98..4e2235eae8 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -590,7 +590,11 @@ impl DataSource for FileScanConfig {
             // Note that this will *ignore* any non-projected columns: these 
don't factor into ordering / equivalence.
             match reassign_predicate_columns(filter, &schema, true) {
                 Ok(filter) => {
-                    match Self::add_filter_equivalence_info(filter, &mut 
eq_properties) {
+                    match Self::add_filter_equivalence_info(
+                        filter,
+                        &mut eq_properties,
+                        &schema,
+                    ) {
                         Ok(()) => {}
                         Err(e) => {
                             warn!("Failed to add filter equivalence info: 
{e}");
@@ -758,9 +762,24 @@ impl FileScanConfig {
     fn add_filter_equivalence_info(
         filter: Arc<dyn PhysicalExpr>,
         eq_properties: &mut EquivalenceProperties,
+        schema: &Schema,
     ) -> Result<()> {
+        macro_rules! ignore_dangling_col {
+            ($col:expr) => {
+                if let Some(col) = $col.as_any().downcast_ref::<Column>() {
+                    if schema.index_of(col.name()).is_err() {
+                        continue;
+                    }
+                }
+            };
+        }
+
         let (equal_pairs, _) = collect_columns_from_predicate(&filter);
         for (lhs, rhs) in equal_pairs {
+            // Ignore any binary expressions that reference non-existent 
columns in the current schema
+            // (e.g. due to unnecessary projections being removed)
+            ignore_dangling_col!(lhs);
+            ignore_dangling_col!(rhs);
             eq_properties.add_equal_conditions(Arc::clone(lhs), 
Arc::clone(rhs))?
         }
         Ok(())
@@ -1449,6 +1468,7 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> 
ScalarValue {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::test_util::col;
     use crate::{
         generate_test_files, test_util::MockSource, tests::aggr_test_schema,
         verify_sort_integrity,
@@ -1457,8 +1477,9 @@ mod tests {
     use arrow::array::{Int32Array, RecordBatch};
     use datafusion_common::stats::Precision;
     use datafusion_common::{assert_batches_eq, internal_err};
-    use datafusion_expr::SortExpr;
+    use datafusion_expr::{Operator, SortExpr};
     use datafusion_physical_expr::create_physical_sort_expr;
+    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
 
     /// Returns the column names on the schema
     pub fn columns(schema: &Schema) -> Vec<String> {
@@ -2214,6 +2235,54 @@ mod tests {
         assert_eq!(config.output_ordering.len(), 1);
     }
 
+    #[test]
+    fn equivalence_properties_after_schema_change() {
+        let file_schema = aggr_test_schema();
+        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
+        // Create a file source with a filter
+        let file_source: Arc<dyn FileSource> =
+            
Arc::new(MockSource::default().with_filter(Arc::new(BinaryExpr::new(
+                col("c2", &file_schema).unwrap(),
+                Operator::Eq,
+                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
+            ))));
+
+        let config = FileScanConfigBuilder::new(
+            object_store_url.clone(),
+            Arc::clone(&file_schema),
+            Arc::clone(&file_source),
+        )
+        .with_projection(Some(vec![0, 1, 2]))
+        .build();
+
+        // Simulate projection being updated. Since the filter has already 
been pushed down,
+        // the new projection won't include the filtered column.
+        let data_source = config
+            .try_swapping_with_projection(&[ProjectionExpr::new(
+                col("c3", &file_schema).unwrap(),
+                "c3".to_string(),
+            )])
+            .unwrap()
+            .unwrap();
+
+        // Gather the equivalence properties from the new data source. There 
should
+        // be no equivalence class for column c2 since it was removed by the 
projection.
+        let eq_properties = data_source.eq_properties();
+        let eq_group = eq_properties.eq_group();
+
+        for class in eq_group.iter() {
+            for expr in class.iter() {
+                if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+                    assert_ne!(
+                        col.name(),
+                        "c2",
+                        "c2 should not be present in any equivalence class"
+                    );
+                }
+            }
+        }
+    }
+
     #[test]
     fn test_file_scan_config_builder_defaults() {
         let file_schema = aggr_test_schema();
diff --git a/datafusion/datasource/src/test_util.rs 
b/datafusion/datasource/src/test_util.rs
index e4a5114aa0..f0aff1fa62 100644
--- a/datafusion/datasource/src/test_util.rs
+++ b/datafusion/datasource/src/test_util.rs
@@ -34,6 +34,14 @@ pub(crate) struct MockSource {
     metrics: ExecutionPlanMetricsSet,
     projected_statistics: Option<Statistics>,
     schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+    filter: Option<Arc<dyn PhysicalExpr>>,
+}
+
+impl MockSource {
+    pub fn with_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
+        self.filter = Some(filter);
+        self
+    }
 }
 
 impl FileSource for MockSource {
@@ -50,6 +58,10 @@ impl FileSource for MockSource {
         self
     }
 
+    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
+        self.filter.clone()
+    }
+
     fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
         Arc::new(Self { ..self.clone() })
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to