This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 71996fba9b make AnalysisContext aware of empty sets to represent 
certainly false bounds (#14279)
71996fba9b is described below

commit 71996fba9bdc797ba142dc41d6e830ae60d8e1ac
Author: Burak Şen <buraks...@gmail.com>
AuthorDate: Tue Jan 28 17:38:13 2025 +0300

    make AnalysisContext aware of empty sets to represent certainly false 
bounds (#14279)
    
    * ready for review
    
    * fmt and lint
    
    * Apply suggestions from code review
    
    Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com>
    
    * apply reviews
    
    * fix test
    
    * Update analysis.rs
    
    * Update analysis.rs
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com>
    Co-authored-by: berkaysynnada <berkay.sa...@synnada.ai>
---
 datafusion-examples/examples/expr_api.rs |   2 +-
 datafusion/physical-expr/src/analysis.rs | 185 ++++++++++++++++++++++---------
 datafusion/physical-plan/src/filter.rs   |  25 +++--
 3 files changed, 150 insertions(+), 62 deletions(-)

diff --git a/datafusion-examples/examples/expr_api.rs 
b/datafusion-examples/examples/expr_api.rs
index 6bfde2ebbf..2908edbb75 100644
--- a/datafusion-examples/examples/expr_api.rs
+++ b/datafusion-examples/examples/expr_api.rs
@@ -270,7 +270,7 @@ fn range_analysis_demo() -> Result<()> {
     // In this case, we can see that, as expected, `analyze` has figured out
     // that in this case,  `date` must be in the range `['2020-09-01', 
'2020-10-01']`
     let expected_range = Interval::try_new(september_1, october_1)?;
-    assert_eq!(analysis_result.boundaries[0].interval, expected_range);
+    assert_eq!(analysis_result.boundaries[0].interval, Some(expected_range));
 
     Ok(())
 }
diff --git a/datafusion/physical-expr/src/analysis.rs 
b/datafusion/physical-expr/src/analysis.rs
index b602a9cba4..ceec21c711 100644
--- a/datafusion/physical-expr/src/analysis.rs
+++ b/datafusion/physical-expr/src/analysis.rs
@@ -81,8 +81,12 @@ impl AnalysisContext {
 #[derive(Clone, Debug, PartialEq)]
 pub struct ExprBoundaries {
     pub column: Column,
-    /// Minimum and maximum values this expression can have.
-    pub interval: Interval,
+    /// Minimum and maximum values this expression can have. A `None` value
+    /// indicates that evaluating the given column results in an empty set.
+    /// For example, if the column `a` has values in the range [10, 20],
+    /// and there is a filter asserting that `a > 50`, then the resulting 
interval
+    /// range of `a` will be `None`.
+    pub interval: Option<Interval>,
     /// Maximum number of distinct values this expression can produce, if 
known.
     pub distinct_count: Precision<usize>,
 }
@@ -118,7 +122,7 @@ impl ExprBoundaries {
         let column = Column::new(field.name(), col_index);
         Ok(ExprBoundaries {
             column,
-            interval,
+            interval: Some(interval),
             distinct_count: col_stats.distinct_count,
         })
     }
@@ -133,7 +137,7 @@ impl ExprBoundaries {
             .map(|(i, field)| {
                 Ok(Self {
                     column: Column::new(field.name(), i),
-                    interval: Interval::make_unbounded(field.data_type())?,
+                    interval: 
Some(Interval::make_unbounded(field.data_type())?),
                     distinct_count: Precision::Absent,
                 })
             })
@@ -161,40 +165,71 @@ pub fn analyze(
     context: AnalysisContext,
     schema: &Schema,
 ) -> Result<AnalysisContext> {
-    let target_boundaries = context.boundaries;
-
-    let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
-
-    let columns = collect_columns(expr)
-        .into_iter()
-        .map(|c| Arc::new(c) as _)
-        .collect::<Vec<_>>();
-
-    let target_expr_and_indices = 
graph.gather_node_indices(columns.as_slice());
-
-    let mut target_indices_and_boundaries = target_expr_and_indices
+    let initial_boundaries = &context.boundaries;
+    if initial_boundaries
         .iter()
-        .filter_map(|(expr, i)| {
-            target_boundaries.iter().find_map(|bound| {
-                expr.as_any()
-                    .downcast_ref::<Column>()
-                    .filter(|expr_column| bound.column.eq(*expr_column))
-                    .map(|_| (*i, bound.interval.clone()))
-            })
-        })
-        .collect::<Vec<_>>();
-
-    match graph
-        .update_ranges(&mut target_indices_and_boundaries, 
Interval::CERTAINLY_TRUE)?
+        .all(|bound| bound.interval.is_none())
     {
-        PropagationResult::Success => {
-            shrink_boundaries(graph, target_boundaries, 
target_expr_and_indices)
+        if initial_boundaries
+            .iter()
+            .any(|bound| bound.distinct_count != Precision::Exact(0))
+        {
+            return internal_err!(
+                "ExprBoundaries has a non-zero distinct count although it 
represents an empty table"
+            );
+        }
+        if context.selectivity != Some(0.0) {
+            return internal_err!(
+                "AnalysisContext has a non-zero selectivity although it 
represents an empty table"
+            );
         }
-        PropagationResult::Infeasible => {
-            Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
+        Ok(context)
+    } else if initial_boundaries
+        .iter()
+        .any(|bound| bound.interval.is_none())
+    {
+        internal_err!(
+                "AnalysisContext is an inconsistent state. Some columns 
represent empty table while others don't"
+            )
+    } else {
+        let mut target_boundaries = context.boundaries;
+        let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
+        let columns = collect_columns(expr)
+            .into_iter()
+            .map(|c| Arc::new(c) as _)
+            .collect::<Vec<_>>();
+
+        let mut target_indices_and_boundaries = vec![];
+        let target_expr_and_indices = 
graph.gather_node_indices(columns.as_slice());
+
+        for (expr, index) in &target_expr_and_indices {
+            if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+                if let Some(bound) =
+                    target_boundaries.iter().find(|b| b.column == *column)
+                {
+                    // Now, it's safe to unwrap
+                    target_indices_and_boundaries
+                        .push((*index, 
bound.interval.as_ref().unwrap().clone()));
+                }
+            }
         }
-        PropagationResult::CannotPropagate => {
-            Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
+
+        match graph
+            .update_ranges(&mut target_indices_and_boundaries, 
Interval::CERTAINLY_TRUE)?
+        {
+            PropagationResult::Success => {
+                shrink_boundaries(graph, target_boundaries, 
target_expr_and_indices)
+            }
+            PropagationResult::Infeasible => {
+                // If the propagation result is infeasible, set intervals to 
None
+                target_boundaries
+                    .iter_mut()
+                    .for_each(|bound| bound.interval = None);
+                
Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
+            }
+            PropagationResult::CannotPropagate => {
+                
Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
+            }
         }
     }
 }
@@ -215,12 +250,12 @@ fn shrink_boundaries(
                 .iter_mut()
                 .find(|bound| bound.column.eq(column))
             {
-                bound.interval = graph.get_interval(*i);
+                bound.interval = Some(graph.get_interval(*i));
             };
         }
     });
 
-    let selectivity = calculate_selectivity(&target_boundaries, 
&initial_boundaries);
+    let selectivity = calculate_selectivity(&target_boundaries, 
&initial_boundaries)?;
 
     if !(0.0..=1.0).contains(&selectivity) {
         return internal_err!("Selectivity is out of limit: {}", selectivity);
@@ -235,16 +270,31 @@ fn shrink_boundaries(
 fn calculate_selectivity(
     target_boundaries: &[ExprBoundaries],
     initial_boundaries: &[ExprBoundaries],
-) -> f64 {
+) -> Result<f64> {
     // Since the intervals are assumed uniform and the values
     // are not correlated, we need to multiply the selectivities
     // of multiple columns to get the overall selectivity.
-    initial_boundaries
-        .iter()
-        .zip(target_boundaries.iter())
-        .fold(1.0, |acc, (initial, target)| {
-            acc * cardinality_ratio(&initial.interval, &target.interval)
-        })
+    if target_boundaries.len() != initial_boundaries.len() {
+        return Err(internal_datafusion_err!(
+            "The number of columns in the initial and target boundaries should 
be the same"
+        ));
+    }
+    let mut acc: f64 = 1.0;
+    for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
+        match (initial.interval.as_ref(), target.interval.as_ref()) {
+            (Some(initial), Some(target)) => {
+                acc *= cardinality_ratio(initial, target);
+            }
+            (None, Some(_)) => {
+                return internal_err!(
+                "Initial boundary cannot be None while having a Some() target 
boundary"
+            );
+            }
+            _ => return Ok(0.0),
+        }
+    }
+
+    Ok(acc)
 }
 
 #[cfg(test)]
@@ -313,16 +363,6 @@ mod tests {
                 Some(16),
                 Some(19),
             ),
-            // (a > 10 AND a < 20) AND (a > 20 AND a < 30)
-            (
-                col("a")
-                    .gt(lit(10))
-                    .and(col("a").lt(lit(20)))
-                    .and(col("a").gt(lit(20)))
-                    .and(col("a").lt(lit(30))),
-                None,
-                None,
-            ),
         ];
         for (expr, lower, upper) in test_cases {
             let boundaries = 
ExprBoundaries::try_new_unbounded(&schema).unwrap();
@@ -335,7 +375,9 @@ mod tests {
                 df_schema.as_ref(),
             )
             .unwrap();
-            let actual = &analysis_result.boundaries[0].interval;
+            let Some(actual) = &analysis_result.boundaries[0].interval else {
+                panic!("The analysis result should contain non-empty intervals 
for all columns");
+            };
             let expected = Interval::make(lower, upper).unwrap();
             assert_eq!(
                 &expected, actual,
@@ -344,6 +386,41 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_analyze_empty_set_boundary_exprs() {
+        let schema = Arc::new(Schema::new(vec![make_field("a", 
DataType::Int32)]));
+
+        let test_cases: Vec<Expr> = vec![
+            // a > 10 AND a < 10
+            col("a").gt(lit(10)).and(col("a").lt(lit(10))),
+            // a > 5 AND (a < 20 OR a > 20)
+            // a > 10 AND a < 20
+            // (a > 10 AND a < 20) AND (a > 20 AND a < 30)
+            col("a")
+                .gt(lit(10))
+                .and(col("a").lt(lit(20)))
+                .and(col("a").gt(lit(20)))
+                .and(col("a").lt(lit(30))),
+        ];
+
+        for expr in test_cases {
+            let boundaries = 
ExprBoundaries::try_new_unbounded(&schema).unwrap();
+            let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
+            let physical_expr =
+                create_physical_expr(&expr, &df_schema, 
&ExecutionProps::new()).unwrap();
+            let analysis_result = analyze(
+                &physical_expr,
+                AnalysisContext::new(boundaries),
+                df_schema.as_ref(),
+            )
+            .unwrap();
+
+            for boundary in analysis_result.boundaries {
+                assert!(boundary.interval.is_none());
+            }
+        }
+    }
+
     #[test]
     fn test_analyze_invalid_boundary_exprs() {
         let schema = Arc::new(Schema::new(vec![make_field("a", 
DataType::Int32)]));
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index ae4a15ba52..ec860b3a9f 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch;
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::stats::Precision;
 use datafusion_common::{
-    internal_err, plan_err, project_schema, DataFusionError, Result,
+    internal_err, plan_err, project_schema, DataFusionError, Result, 
ScalarValue,
 };
 use datafusion_execution::TaskContext;
 use datafusion_expr::Operator;
@@ -457,6 +457,15 @@ fn collect_new_statistics(
                     ..
                 },
             )| {
+                let Some(interval) = interval else {
+                    // If the interval is `None`, we can say that there are no 
rows:
+                    return ColumnStatistics {
+                        null_count: Precision::Exact(0),
+                        max_value: Precision::Exact(ScalarValue::Null),
+                        min_value: Precision::Exact(ScalarValue::Null),
+                        distinct_count: Precision::Exact(0),
+                    };
+                };
                 let (lower, upper) = interval.into_bounds();
                 let (min_value, max_value) = if lower.eq(&upper) {
                     (Precision::Exact(lower), Precision::Exact(upper))
@@ -1078,14 +1087,16 @@ mod tests {
             statistics.column_statistics,
             vec![
                 ColumnStatistics {
-                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
-                    max_value: 
Precision::Inexact(ScalarValue::Int32(Some(100))),
-                    ..Default::default()
+                    min_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Null),
+                    distinct_count: Precision::Exact(0),
+                    null_count: Precision::Exact(0),
                 },
                 ColumnStatistics {
-                    min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
-                    max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
-                    ..Default::default()
+                    min_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Null),
+                    distinct_count: Precision::Exact(0),
+                    null_count: Precision::Exact(0),
                 },
             ]
         );


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to