berkaysynnada commented on code in PR #21473:
URL: https://github.com/apache/datafusion/pull/21473#discussion_r3097325977


##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -762,6 +787,60 @@ impl EmbeddedProjection for FilterExec {
     }
 }
 
+/// Collects column equality information from `col = literal` predicates in a
+/// conjunction.
+///
+/// Returns `(eq_columns, is_infeasible)`:
+/// - `eq_columns`: column indices constrained to a single value.
+/// - `is_infeasible`: `true` when the same column is equated to two different
+///   non-null literals (e.g. `name = 'alice' AND name = 'bob'`), which is
+///   always unsatisfiable.
+///
+/// Only AND conjunctions are traversed; OR is intentionally skipped
+/// since `a = 1 OR a = 2` does not pin NDV to 1.
+fn collect_equality_columns(
+    predicate: &Arc<dyn PhysicalExpr>,
+) -> (HashMap<usize, ScalarValue>, bool) {

Review Comment:
   The returned `HashMap<usize, ScalarValue>` is only used for its keys. The 
`ScalarValues` are only needed internally to detect contradictions and are 
discarded by the caller. Could this return `(HashSet<usize>, bool)` (or even 
`(Vec<usize>, bool)`) to match what the caller actually needs?



##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -308,44 +309,68 @@ impl FilterExec {
         &self.projection
     }
 
-    /// Calculates `Statistics` for `FilterExec`, by applying selectivity 
(either default, or estimated) to input statistics.
+    /// Calculates `Statistics` for `FilterExec`, by applying selectivity
+    /// (either default, or estimated) to input statistics.
+    ///
+    /// Equality predicates (`col = literal`) set NDV to `Exact(1)`, or
+    /// `Exact(0)` when the predicate is contradictory (e.g. `a = 1 AND a = 
2`).
     pub(crate) fn statistics_helper(
         schema: &SchemaRef,
         input_stats: Statistics,
         predicate: &Arc<dyn PhysicalExpr>,
         default_selectivity: u8,
     ) -> Result<Statistics> {
-        if !check_support(predicate, schema) {
-            let selectivity = default_selectivity as f64 / 100.0;
-            let mut stats = input_stats.to_inexact();
-            stats.num_rows = 
stats.num_rows.with_estimated_selectivity(selectivity);
-            stats.total_byte_size = stats
-                .total_byte_size
-                .with_estimated_selectivity(selectivity);
-            return Ok(stats);
-        }
+        let (eq_columns, is_infeasible) = collect_equality_columns(predicate);
 
         let num_rows = input_stats.num_rows;
         let total_byte_size = input_stats.total_byte_size;
-        let input_analysis_ctx =
-            AnalysisContext::try_from_statistics(schema, 
&input_stats.column_statistics)?;
 
-        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
+        let (selectivity, mut column_statistics) = if is_infeasible {
+            (0.0, input_stats.to_inexact().column_statistics)
+        } else if !check_support(predicate, schema) {
+            (
+                default_selectivity as f64 / 100.0,
+                input_stats.to_inexact().column_statistics,
+            )
+        } else {
+            let input_analysis_ctx = AnalysisContext::try_from_statistics(
+                schema,
+                &input_stats.column_statistics,
+            )?;
+            let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
+            let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
+            let filtered_num_rows = 
num_rows.with_estimated_selectivity(selectivity);
+            (
+                selectivity,
+                collect_new_statistics(
+                    schema,
+                    &input_stats.column_statistics,
+                    analysis_ctx.boundaries,
+                    match &filtered_num_rows {
+                        Precision::Absent => None,
+                        p => Some(*p),
+                    },
+                ),
+            )
+        };
 
-        // Estimate (inexact) selectivity of predicate
-        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
         let num_rows = num_rows.with_estimated_selectivity(selectivity);
         let total_byte_size = 
total_byte_size.with_estimated_selectivity(selectivity);
 
-        let column_statistics = collect_new_statistics(
-            schema,
-            &input_stats.column_statistics,
-            analysis_ctx.boundaries,
-            match &num_rows {
-                Precision::Absent => None,
-                p => Some(*p),
-            },
-        );
+        if is_infeasible {
+            for col_stat in &mut column_statistics {
+                col_stat.distinct_count = Precision::Exact(0);
+            }
+        } else {
+            for idx in eq_columns.keys() {
+                if *idx < column_statistics.len()
+                    && column_statistics[*idx].distinct_count != 
Precision::Exact(0)
+                {
+                    column_statistics[*idx].distinct_count = 
Precision::Exact(1);
+                }
+            }
+        }
+

Review Comment:
   The function matches on `is_infeasible` twice: once to pick the 
selectivity/column_statistics tuple, and again here at the bottom to decide 
between zeroing out distinct_count vs. setting it to 1. Would it be cleaner to 
move each branch's distinct_count adjustment into the branch where that 
column_statistics is produced, so each branch is self-contained and this second 
match goes away? like
   
   
   ```
   let (selectivity, column_statistics) = if is_infeasible {
       let mut cs = input_stats.to_inexact().column_statistics;
       for c in &mut cs { c.distinct_count = Precision::Exact(0); }
       (0.0, cs)
   } else if !check_support(predicate, schema) {
       let mut cs = input_stats.to_inexact().column_statistics;
       for &idx in eq_columns.keys() {
           if idx < cs.len() && cs[idx].distinct_count != Precision::Exact(0) {
               cs[idx].distinct_count = Precision::Exact(1);
           }
       }
       (default_selectivity as f64 / 100.0, cs)
   } else {
       // interval-analysis path — `collect_new_statistics` already sets 
Exact(1)
       // when the interval collapses to a single value, so no post-fix needed.
       let ctx = AnalysisContext::try_from_statistics(schema, 
&input_stats.column_statistics)?;
       let ctx = analyze(predicate, ctx, schema)?;
       let selectivity = ctx.selectivity.unwrap_or(1.0);
       let filtered = num_rows.with_estimated_selectivity(selectivity);
       let cs = collect_new_statistics(schema, &input_stats.column_statistics,
           ctx.boundaries,
           (!matches!(filtered, Precision::Absent)).then_some(filtered));
       (selectivity, cs)
   };
   ```



##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -308,44 +309,68 @@ impl FilterExec {
         &self.projection
     }
 
-    /// Calculates `Statistics` for `FilterExec`, by applying selectivity 
(either default, or estimated) to input statistics.
+    /// Calculates `Statistics` for `FilterExec`, by applying selectivity
+    /// (either default, or estimated) to input statistics.
+    ///
+    /// Equality predicates (`col = literal`) set NDV to `Exact(1)`, or
+    /// `Exact(0)` when the predicate is contradictory (e.g. `a = 1 AND a = 
2`).
     pub(crate) fn statistics_helper(
         schema: &SchemaRef,
         input_stats: Statistics,
         predicate: &Arc<dyn PhysicalExpr>,
         default_selectivity: u8,
     ) -> Result<Statistics> {
-        if !check_support(predicate, schema) {
-            let selectivity = default_selectivity as f64 / 100.0;
-            let mut stats = input_stats.to_inexact();
-            stats.num_rows = 
stats.num_rows.with_estimated_selectivity(selectivity);
-            stats.total_byte_size = stats
-                .total_byte_size
-                .with_estimated_selectivity(selectivity);
-            return Ok(stats);
-        }
+        let (eq_columns, is_infeasible) = collect_equality_columns(predicate);
 
         let num_rows = input_stats.num_rows;
         let total_byte_size = input_stats.total_byte_size;
-        let input_analysis_ctx =
-            AnalysisContext::try_from_statistics(schema, 
&input_stats.column_statistics)?;
 
-        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
+        let (selectivity, mut column_statistics) = if is_infeasible {
+            (0.0, input_stats.to_inexact().column_statistics)
+        } else if !check_support(predicate, schema) {
+            (
+                default_selectivity as f64 / 100.0,
+                input_stats.to_inexact().column_statistics,
+            )
+        } else {
+            let input_analysis_ctx = AnalysisContext::try_from_statistics(
+                schema,
+                &input_stats.column_statistics,
+            )?;
+            let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
+            let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
+            let filtered_num_rows = 
num_rows.with_estimated_selectivity(selectivity);
+            (
+                selectivity,
+                collect_new_statistics(

Review Comment:
   For the supported-type path, `collect_new_statistics` already sets 
distinct_count = Exact(1) when an interval collapses to a single value 
(is_single_value check inside it). The bottom eq_columns loop then runs again 
and sets it to Exact(1) a second time. Not wrong, but redundant. If you take up 
the restructuring above, the override is only needed in the !check_support 
branch



##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -762,6 +787,60 @@ impl EmbeddedProjection for FilterExec {
     }
 }
 
+/// Collects column equality information from `col = literal` predicates in a
+/// conjunction.
+///
+/// Returns `(eq_columns, is_infeasible)`:
+/// - `eq_columns`: column indices constrained to a single value.
+/// - `is_infeasible`: `true` when the same column is equated to two different
+///   non-null literals (e.g. `name = 'alice' AND name = 'bob'`), which is
+///   always unsatisfiable.
+///
+/// Only AND conjunctions are traversed; OR is intentionally skipped
+/// since `a = 1 OR a = 2` does not pin NDV to 1.
+fn collect_equality_columns(
+    predicate: &Arc<dyn PhysicalExpr>,
+) -> (HashMap<usize, ScalarValue>, bool) {
+    let mut eq_columns: HashMap<usize, ScalarValue> = HashMap::new();
+    let mut infeasible = false;
+
+    for expr in split_conjunction(predicate) {
+        let Some(binary) = expr.downcast_ref::<BinaryExpr>() else {
+            continue;
+        };
+        if *binary.op() != Operator::Eq {
+            continue;
+        }
+        let left = binary.left();
+        let right = binary.right();
+        let pair = if let Some(col) = left.downcast_ref::<Column>()
+            && let Some(lit) = right.downcast_ref::<Literal>()
+            && !lit.value().is_null()
+        {
+            Some((col.index(), lit.value().clone()))
+        } else if let Some(col) = right.downcast_ref::<Column>()
+            && let Some(lit) = left.downcast_ref::<Literal>()
+            && !lit.value().is_null()
+        {
+            Some((col.index(), lit.value().clone()))
+        } else {
+            None
+        };
+
+        if let Some((idx, value)) = pair {
+            if let Some(prev) = eq_columns.get(&idx) {
+                if *prev != value {

Review Comment:
   you can use `Entry` here. And also, once infeasible is true, subsequent 
conjuncts don't change the outcome. A break there avoids scanning the rest of 
the predicate.



##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -308,44 +309,68 @@ impl FilterExec {
         &self.projection
     }
 
-    /// Calculates `Statistics` for `FilterExec`, by applying selectivity 
(either default, or estimated) to input statistics.
+    /// Calculates `Statistics` for `FilterExec`, by applying selectivity
+    /// (either default, or estimated) to input statistics.
+    ///
+    /// Equality predicates (`col = literal`) set NDV to `Exact(1)`, or
+    /// `Exact(0)` when the predicate is contradictory (e.g. `a = 1 AND a = 
2`).
     pub(crate) fn statistics_helper(
         schema: &SchemaRef,
         input_stats: Statistics,
         predicate: &Arc<dyn PhysicalExpr>,
         default_selectivity: u8,
     ) -> Result<Statistics> {
-        if !check_support(predicate, schema) {
-            let selectivity = default_selectivity as f64 / 100.0;
-            let mut stats = input_stats.to_inexact();
-            stats.num_rows = 
stats.num_rows.with_estimated_selectivity(selectivity);
-            stats.total_byte_size = stats
-                .total_byte_size
-                .with_estimated_selectivity(selectivity);
-            return Ok(stats);
-        }
+        let (eq_columns, is_infeasible) = collect_equality_columns(predicate);
 
         let num_rows = input_stats.num_rows;
         let total_byte_size = input_stats.total_byte_size;
-        let input_analysis_ctx =
-            AnalysisContext::try_from_statistics(schema, 
&input_stats.column_statistics)?;
 
-        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
+        let (selectivity, mut column_statistics) = if is_infeasible {
+            (0.0, input_stats.to_inexact().column_statistics)
+        } else if !check_support(predicate, schema) {
+            (
+                default_selectivity as f64 / 100.0,
+                input_stats.to_inexact().column_statistics,
+            )
+        } else {
+            let input_analysis_ctx = AnalysisContext::try_from_statistics(
+                schema,
+                &input_stats.column_statistics,
+            )?;
+            let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
+            let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
+            let filtered_num_rows = 
num_rows.with_estimated_selectivity(selectivity);

Review Comment:
   `num_rows.with_estimated_selectivity(selectivity)` is computed once inside 
the interval branch as `filtered_num_rows`, then again right after the match 
for the returned num_rows. Could be computed once and reused.



##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -308,44 +309,68 @@ impl FilterExec {
         &self.projection
     }
 
-    /// Calculates `Statistics` for `FilterExec`, by applying selectivity 
(either default, or estimated) to input statistics.
+    /// Calculates `Statistics` for `FilterExec`, by applying selectivity
+    /// (either default, or estimated) to input statistics.
+    ///
+    /// Equality predicates (`col = literal`) set NDV to `Exact(1)`, or
+    /// `Exact(0)` when the predicate is contradictory (e.g. `a = 1 AND a = 
2`).
     pub(crate) fn statistics_helper(
         schema: &SchemaRef,
         input_stats: Statistics,
         predicate: &Arc<dyn PhysicalExpr>,
         default_selectivity: u8,
     ) -> Result<Statistics> {
-        if !check_support(predicate, schema) {
-            let selectivity = default_selectivity as f64 / 100.0;
-            let mut stats = input_stats.to_inexact();
-            stats.num_rows = 
stats.num_rows.with_estimated_selectivity(selectivity);
-            stats.total_byte_size = stats
-                .total_byte_size
-                .with_estimated_selectivity(selectivity);
-            return Ok(stats);
-        }
+        let (eq_columns, is_infeasible) = collect_equality_columns(predicate);
 
         let num_rows = input_stats.num_rows;
         let total_byte_size = input_stats.total_byte_size;
-        let input_analysis_ctx =
-            AnalysisContext::try_from_statistics(schema, 
&input_stats.column_statistics)?;
 
-        let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
+        let (selectivity, mut column_statistics) = if is_infeasible {
+            (0.0, input_stats.to_inexact().column_statistics)
+        } else if !check_support(predicate, schema) {
+            (
+                default_selectivity as f64 / 100.0,
+                input_stats.to_inexact().column_statistics,
+            )
+        } else {
+            let input_analysis_ctx = AnalysisContext::try_from_statistics(
+                schema,
+                &input_stats.column_statistics,
+            )?;
+            let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?;
+            let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
+            let filtered_num_rows = 
num_rows.with_estimated_selectivity(selectivity);
+            (
+                selectivity,
+                collect_new_statistics(
+                    schema,
+                    &input_stats.column_statistics,
+                    analysis_ctx.boundaries,
+                    match &filtered_num_rows {
+                        Precision::Absent => None,
+                        p => Some(*p),
+                    },
+                ),
+            )
+        };
 
-        // Estimate (inexact) selectivity of predicate
-        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
         let num_rows = num_rows.with_estimated_selectivity(selectivity);
         let total_byte_size = 
total_byte_size.with_estimated_selectivity(selectivity);
 
-        let column_statistics = collect_new_statistics(
-            schema,
-            &input_stats.column_statistics,
-            analysis_ctx.boundaries,
-            match &num_rows {
-                Precision::Absent => None,
-                p => Some(*p),
-            },
-        );
+        if is_infeasible {

Review Comment:
   When the predicate is infeasible we correctly set num_rows = 0 and 
distinct_count = Exact(0) for every column, but min_value, max_value, and 
null_count carry over from the input unchanged. That leaves internally 
contradictory stats downstream; e.g. for` a = 1 AND a = 2` on an input with 
min=1, max=100, null_count=5, distinct=80, the output becomes rows=0, 
distinct=0, min=1, max=100, null_count=5. A reader that trusts null_count=5 on 
a zero-row plan would get confused.
   
   Worth also clearing null_count to Exact(0) and min_value/max_value to 
Precision::Absent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to