This is an automated email from the ASF dual-hosted git repository. adriangb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 62f214fede fix: ignore non-existent columns when adding filter equivalence info in `FileScanConfig` (#17546) 62f214fede is described below commit 62f214fede28070519fe0494d0ec34d4f296a92a Author: Rohan Krishnaswamy <47869999+rkris...@users.noreply.github.com> AuthorDate: Mon Sep 15 20:10:56 2025 -0700 fix: ignore non-existent columns when adding filter equivalence info in `FileScanConfig` (#17546) --- datafusion/datasource/src/file_scan_config.rs | 73 ++++++++++++++++++++++++++- datafusion/datasource/src/test_util.rs | 12 +++++ 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 82d7057a98..4e2235eae8 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -590,7 +590,11 @@ impl DataSource for FileScanConfig { // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence. match reassign_predicate_columns(filter, &schema, true) { Ok(filter) => { - match Self::add_filter_equivalence_info(filter, &mut eq_properties) { + match Self::add_filter_equivalence_info( + filter, + &mut eq_properties, + &schema, + ) { Ok(()) => {} Err(e) => { warn!("Failed to add filter equivalence info: {e}"); @@ -758,9 +762,24 @@ impl FileScanConfig { fn add_filter_equivalence_info( filter: Arc<dyn PhysicalExpr>, eq_properties: &mut EquivalenceProperties, + schema: &Schema, ) -> Result<()> { + macro_rules! ignore_dangling_col { + ($col:expr) => { + if let Some(col) = $col.as_any().downcast_ref::<Column>() { + if schema.index_of(col.name()).is_err() { + continue; + } + } + }; + } + let (equal_pairs, _) = collect_columns_from_predicate(&filter); for (lhs, rhs) in equal_pairs { + // Ignore any binary expressions that reference non-existent columns in the current schema + // (e.g. due to unnecessary projections being removed) + ignore_dangling_col!(lhs); + ignore_dangling_col!(rhs); eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? } Ok(()) @@ -1449,6 +1468,7 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { #[cfg(test)] mod tests { use super::*; + use crate::test_util::col; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, @@ -1457,8 +1477,9 @@ mod tests { use arrow::array::{Int32Array, RecordBatch}; use datafusion_common::stats::Precision; use datafusion_common::{assert_batches_eq, internal_err}; - use datafusion_expr::SortExpr; + use datafusion_expr::{Operator, SortExpr}; use datafusion_physical_expr::create_physical_sort_expr; + use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; /// Returns the column names on the schema pub fn columns(schema: &Schema) -> Vec<String> { @@ -2214,6 +2235,54 @@ mod tests { assert_eq!(config.output_ordering.len(), 1); } + #[test] + fn equivalence_properties_after_schema_change() { + let file_schema = aggr_test_schema(); + let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); + // Create a file source with a filter + let file_source: Arc<dyn FileSource> = + Arc::new(MockSource::default().with_filter(Arc::new(BinaryExpr::new( + col("c2", &file_schema).unwrap(), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )))); + + let config = FileScanConfigBuilder::new( + object_store_url.clone(), + Arc::clone(&file_schema), + Arc::clone(&file_source), + ) + .with_projection(Some(vec![0, 1, 2])) + .build(); + + // Simulate projection being updated. Since the filter has already been pushed down, + // the new projection won't include the filtered column. + let data_source = config + .try_swapping_with_projection(&[ProjectionExpr::new( + col("c3", &file_schema).unwrap(), + "c3".to_string(), + )]) + .unwrap() + .unwrap(); + + // Gather the equivalence properties from the new data source. There should + // be no equivalence class for column c2 since it was removed by the projection. + let eq_properties = data_source.eq_properties(); + let eq_group = eq_properties.eq_group(); + + for class in eq_group.iter() { + for expr in class.iter() { + if let Some(col) = expr.as_any().downcast_ref::<Column>() { + assert_ne!( + col.name(), + "c2", + "c2 should not be present in any equivalence class" + ); + } + } + } + } + #[test] fn test_file_scan_config_builder_defaults() { let file_schema = aggr_test_schema(); diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index e4a5114aa0..f0aff1fa62 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -34,6 +34,14 @@ pub(crate) struct MockSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option<Statistics>, schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, + filter: Option<Arc<dyn PhysicalExpr>>, +} + +impl MockSource { + pub fn with_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self { + self.filter = Some(filter); + self + } } impl FileSource for MockSource { @@ -50,6 +58,10 @@ impl FileSource for MockSource { self } + fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> { + self.filter.clone() + } + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { Arc::new(Self { ..self.clone() }) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org