This is an automated email from the ASF dual-hosted git repository. berkay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 71996fba9b make AnalysisContext aware of empty sets to represent certainly false bounds (#14279) 71996fba9b is described below commit 71996fba9bdc797ba142dc41d6e830ae60d8e1ac Author: Burak Şen <buraks...@gmail.com> AuthorDate: Tue Jan 28 17:38:13 2025 +0300 make AnalysisContext aware of empty sets to represent certainly false bounds (#14279) * ready for review * fmt and lint * Apply suggestions from code review Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com> * apply reviews * fix test * Update analysis.rs * Update analysis.rs --------- Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com> Co-authored-by: berkaysynnada <berkay.sa...@synnada.ai> --- datafusion-examples/examples/expr_api.rs | 2 +- datafusion/physical-expr/src/analysis.rs | 185 ++++++++++++++++++++++--------- datafusion/physical-plan/src/filter.rs | 25 +++-- 3 files changed, 150 insertions(+), 62 deletions(-) diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 6bfde2ebbf..2908edbb75 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -270,7 +270,7 @@ fn range_analysis_demo() -> Result<()> { // In this case, we can see that, as expected, `analyze` has figured out // that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']` let expected_range = Interval::try_new(september_1, october_1)?; - assert_eq!(analysis_result.boundaries[0].interval, expected_range); + assert_eq!(analysis_result.boundaries[0].interval, Some(expected_range)); Ok(()) } diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index b602a9cba4..ceec21c711 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -81,8 +81,12 @@ impl AnalysisContext { #[derive(Clone, Debug, PartialEq)] pub struct ExprBoundaries { pub column: Column, - /// Minimum and maximum values this expression can have. - pub interval: Interval, + /// Minimum and maximum values this expression can have. A `None` value + /// indicates that evaluating the given column results in an empty set. + /// For example, if the column `a` has values in the range [10, 20], + /// and there is a filter asserting that `a > 50`, then the resulting interval + /// range of `a` will be `None`. + pub interval: Option<Interval>, /// Maximum number of distinct values this expression can produce, if known. pub distinct_count: Precision<usize>, } @@ -118,7 +122,7 @@ impl ExprBoundaries { let column = Column::new(field.name(), col_index); Ok(ExprBoundaries { column, - interval, + interval: Some(interval), distinct_count: col_stats.distinct_count, }) } @@ -133,7 +137,7 @@ impl ExprBoundaries { .map(|(i, field)| { Ok(Self { column: Column::new(field.name(), i), - interval: Interval::make_unbounded(field.data_type())?, + interval: Some(Interval::make_unbounded(field.data_type())?), distinct_count: Precision::Absent, }) }) @@ -161,40 +165,71 @@ pub fn analyze( context: AnalysisContext, schema: &Schema, ) -> Result<AnalysisContext> { - let target_boundaries = context.boundaries; - - let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?; - - let columns = collect_columns(expr) - .into_iter() - .map(|c| Arc::new(c) as _) - .collect::<Vec<_>>(); - - let target_expr_and_indices = graph.gather_node_indices(columns.as_slice()); - - let mut target_indices_and_boundaries = target_expr_and_indices + let initial_boundaries = &context.boundaries; + if initial_boundaries .iter() - .filter_map(|(expr, i)| { - target_boundaries.iter().find_map(|bound| { - expr.as_any() - .downcast_ref::<Column>() - .filter(|expr_column| bound.column.eq(*expr_column)) - .map(|_| (*i, bound.interval.clone())) - }) - }) - .collect::<Vec<_>>(); - - match graph - .update_ranges(&mut target_indices_and_boundaries, Interval::CERTAINLY_TRUE)? + .all(|bound| bound.interval.is_none()) { - PropagationResult::Success => { - shrink_boundaries(graph, target_boundaries, target_expr_and_indices) + if initial_boundaries + .iter() + .any(|bound| bound.distinct_count != Precision::Exact(0)) + { + return internal_err!( + "ExprBoundaries has a non-zero distinct count although it represents an empty table" + ); + } + if context.selectivity != Some(0.0) { + return internal_err!( + "AnalysisContext has a non-zero selectivity although it represents an empty table" + ); } - PropagationResult::Infeasible => { - Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0)) + Ok(context) + } else if initial_boundaries + .iter() + .any(|bound| bound.interval.is_none()) + { + internal_err!( + "AnalysisContext is an inconsistent state. Some columns represent empty table while others don't" + ) + } else { + let mut target_boundaries = context.boundaries; + let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?; + let columns = collect_columns(expr) + .into_iter() + .map(|c| Arc::new(c) as _) + .collect::<Vec<_>>(); + + let mut target_indices_and_boundaries = vec![]; + let target_expr_and_indices = graph.gather_node_indices(columns.as_slice()); + + for (expr, index) in &target_expr_and_indices { + if let Some(column) = expr.as_any().downcast_ref::<Column>() { + if let Some(bound) = + target_boundaries.iter().find(|b| b.column == *column) + { + // Now, it's safe to unwrap + target_indices_and_boundaries + .push((*index, bound.interval.as_ref().unwrap().clone())); + } + } } - PropagationResult::CannotPropagate => { - Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0)) + + match graph + .update_ranges(&mut target_indices_and_boundaries, Interval::CERTAINLY_TRUE)? + { + PropagationResult::Success => { + shrink_boundaries(graph, target_boundaries, target_expr_and_indices) + } + PropagationResult::Infeasible => { + // If the propagation result is infeasible, set intervals to None + target_boundaries + .iter_mut() + .for_each(|bound| bound.interval = None); + Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0)) + } + PropagationResult::CannotPropagate => { + Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0)) + } } } } @@ -215,12 +250,12 @@ fn shrink_boundaries( .iter_mut() .find(|bound| bound.column.eq(column)) { - bound.interval = graph.get_interval(*i); + bound.interval = Some(graph.get_interval(*i)); }; } }); - let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries); + let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?; if !(0.0..=1.0).contains(&selectivity) { return internal_err!("Selectivity is out of limit: {}", selectivity); @@ -235,16 +270,31 @@ fn shrink_boundaries( fn calculate_selectivity( target_boundaries: &[ExprBoundaries], initial_boundaries: &[ExprBoundaries], -) -> f64 { +) -> Result<f64> { // Since the intervals are assumed uniform and the values // are not correlated, we need to multiply the selectivities // of multiple columns to get the overall selectivity. - initial_boundaries - .iter() - .zip(target_boundaries.iter()) - .fold(1.0, |acc, (initial, target)| { - acc * cardinality_ratio(&initial.interval, &target.interval) - }) + if target_boundaries.len() != initial_boundaries.len() { + return Err(internal_datafusion_err!( + "The number of columns in the initial and target boundaries should be the same" + )); + } + let mut acc: f64 = 1.0; + for (initial, target) in initial_boundaries.iter().zip(target_boundaries) { + match (initial.interval.as_ref(), target.interval.as_ref()) { + (Some(initial), Some(target)) => { + acc *= cardinality_ratio(initial, target); + } + (None, Some(_)) => { + return internal_err!( + "Initial boundary cannot be None while having a Some() target boundary" + ); + } + _ => return Ok(0.0), + } + } + + Ok(acc) } #[cfg(test)] @@ -313,16 +363,6 @@ mod tests { Some(16), Some(19), ), - // (a > 10 AND a < 20) AND (a > 20 AND a < 30) - ( - col("a") - .gt(lit(10)) - .and(col("a").lt(lit(20))) - .and(col("a").gt(lit(20))) - .and(col("a").lt(lit(30))), - None, - None, - ), ]; for (expr, lower, upper) in test_cases { let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap(); @@ -335,7 +375,9 @@ mod tests { df_schema.as_ref(), ) .unwrap(); - let actual = &analysis_result.boundaries[0].interval; + let Some(actual) = &analysis_result.boundaries[0].interval else { + panic!("The analysis result should contain non-empty intervals for all columns"); + }; let expected = Interval::make(lower, upper).unwrap(); assert_eq!( &expected, actual, @@ -344,6 +386,41 @@ mod tests { } } + #[test] + fn test_analyze_empty_set_boundary_exprs() { + let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)])); + + let test_cases: Vec<Expr> = vec![ + // a > 10 AND a < 10 + col("a").gt(lit(10)).and(col("a").lt(lit(10))), + // a > 5 AND (a < 20 OR a > 20) + // a > 10 AND a < 20 + // (a > 10 AND a < 20) AND (a > 20 AND a < 30) + col("a") + .gt(lit(10)) + .and(col("a").lt(lit(20))) + .and(col("a").gt(lit(20))) + .and(col("a").lt(lit(30))), + ]; + + for expr in test_cases { + let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap(); + let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap(); + let physical_expr = + create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap(); + let analysis_result = analyze( + &physical_expr, + AnalysisContext::new(boundaries), + df_schema.as_ref(), + ) + .unwrap(); + + for boundary in analysis_result.boundaries { + assert!(boundary.interval.is_none()); + } + } + } + #[test] fn test_analyze_invalid_boundary_exprs() { let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)])); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ae4a15ba52..ec860b3a9f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_err, plan_err, project_schema, DataFusionError, Result, + internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; use datafusion_execution::TaskContext; use datafusion_expr::Operator; @@ -457,6 +457,15 @@ fn collect_new_statistics( .. }, )| { + let Some(interval) = interval else { + // If the interval is `None`, we can say that there are no rows: + return ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + }; + }; let (lower, upper) = interval.into_bounds(); let (min_value, max_value) = if lower.eq(&upper) { (Precision::Exact(lower), Precision::Exact(upper)) @@ -1078,14 +1087,16 @@ mod tests { statistics.column_statistics, vec![ ColumnStatistics { - min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), - max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), - ..Default::default() + min_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + null_count: Precision::Exact(0), }, ColumnStatistics { - min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), - max_value: Precision::Inexact(ScalarValue::Int32(Some(3))), - ..Default::default() + min_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + null_count: Precision::Exact(0), }, ] ); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org