This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-50
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-50 by this push:
new 891202abf9 fix: ignore non-existent columns when adding filter
equivalence info in `FileScanConfig` (#17546) (#17600)
891202abf9 is described below
commit 891202abf962e1507d58d3100671ab11d7f4708f
Author: Rohan Krishnaswamy <[email protected]>
AuthorDate: Tue Sep 16 10:31:42 2025 -0700
fix: ignore non-existent columns when adding filter equivalence info in
`FileScanConfig` (#17546) (#17600)
---
datafusion/datasource/src/file_scan_config.rs | 73 ++++++++++++++++++++++++++-
datafusion/datasource/src/test_util.rs | 12 +++++
2 files changed, 83 insertions(+), 2 deletions(-)
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index 82d7057a98..4e2235eae8 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -590,7 +590,11 @@ impl DataSource for FileScanConfig {
// Note that this will *ignore* any non-projected columns: these
don't factor into ordering / equivalence.
match reassign_predicate_columns(filter, &schema, true) {
Ok(filter) => {
- match Self::add_filter_equivalence_info(filter, &mut
eq_properties) {
+ match Self::add_filter_equivalence_info(
+ filter,
+ &mut eq_properties,
+ &schema,
+ ) {
Ok(()) => {}
Err(e) => {
warn!("Failed to add filter equivalence info:
{e}");
@@ -758,9 +762,24 @@ impl FileScanConfig {
fn add_filter_equivalence_info(
filter: Arc<dyn PhysicalExpr>,
eq_properties: &mut EquivalenceProperties,
+ schema: &Schema,
) -> Result<()> {
+ macro_rules! ignore_dangling_col {
+ ($col:expr) => {
+ if let Some(col) = $col.as_any().downcast_ref::<Column>() {
+ if schema.index_of(col.name()).is_err() {
+ continue;
+ }
+ }
+ };
+ }
+
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
for (lhs, rhs) in equal_pairs {
+ // Ignore any binary expressions that reference non-existent
columns in the current schema
+ // (e.g. due to unnecessary projections being removed)
+ ignore_dangling_col!(lhs);
+ ignore_dangling_col!(rhs);
eq_properties.add_equal_conditions(Arc::clone(lhs),
Arc::clone(rhs))?
}
Ok(())
@@ -1449,6 +1468,7 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) ->
ScalarValue {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::test_util::col;
use crate::{
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
verify_sort_integrity,
@@ -1457,8 +1477,9 @@ mod tests {
use arrow::array::{Int32Array, RecordBatch};
use datafusion_common::stats::Precision;
use datafusion_common::{assert_batches_eq, internal_err};
- use datafusion_expr::SortExpr;
+ use datafusion_expr::{Operator, SortExpr};
use datafusion_physical_expr::create_physical_sort_expr;
+ use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
/// Returns the column names on the schema
pub fn columns(schema: &Schema) -> Vec<String> {
@@ -2214,6 +2235,54 @@ mod tests {
assert_eq!(config.output_ordering.len(), 1);
}
+ #[test]
+ fn equivalence_properties_after_schema_change() {
+ let file_schema = aggr_test_schema();
+ let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
+ // Create a file source with a filter
+ let file_source: Arc<dyn FileSource> =
+
Arc::new(MockSource::default().with_filter(Arc::new(BinaryExpr::new(
+ col("c2", &file_schema).unwrap(),
+ Operator::Eq,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
+ ))));
+
+ let config = FileScanConfigBuilder::new(
+ object_store_url.clone(),
+ Arc::clone(&file_schema),
+ Arc::clone(&file_source),
+ )
+ .with_projection(Some(vec![0, 1, 2]))
+ .build();
+
+ // Simulate projection being updated. Since the filter has already
been pushed down,
+ // the new projection won't include the filtered column.
+ let data_source = config
+ .try_swapping_with_projection(&[ProjectionExpr::new(
+ col("c3", &file_schema).unwrap(),
+ "c3".to_string(),
+ )])
+ .unwrap()
+ .unwrap();
+
+ // Gather the equivalence properties from the new data source. There
should
+ // be no equivalence class for column c2 since it was removed by the
projection.
+ let eq_properties = data_source.eq_properties();
+ let eq_group = eq_properties.eq_group();
+
+ for class in eq_group.iter() {
+ for expr in class.iter() {
+ if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+ assert_ne!(
+ col.name(),
+ "c2",
+ "c2 should not be present in any equivalence class"
+ );
+ }
+ }
+ }
+ }
+
#[test]
fn test_file_scan_config_builder_defaults() {
let file_schema = aggr_test_schema();
diff --git a/datafusion/datasource/src/test_util.rs
b/datafusion/datasource/src/test_util.rs
index e4a5114aa0..f0aff1fa62 100644
--- a/datafusion/datasource/src/test_util.rs
+++ b/datafusion/datasource/src/test_util.rs
@@ -34,6 +34,14 @@ pub(crate) struct MockSource {
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+ filter: Option<Arc<dyn PhysicalExpr>>,
+}
+
+impl MockSource {
+ pub fn with_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
+ self.filter = Some(filter);
+ self
+ }
}
impl FileSource for MockSource {
@@ -50,6 +58,10 @@ impl FileSource for MockSource {
self
}
+ fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
+ self.filter.clone()
+ }
+
fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
Arc::new(Self { ..self.clone() })
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]