jonathanc-n commented on code in PR #19957:
URL: https://github.com/apache/datafusion/pull/19957#discussion_r2897571210


##########
datafusion/physical-expr/src/expression_analyzer.rs:
##########
@@ -0,0 +1,1222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable expression-level statistics analysis.
+//!
+//! This module provides an extensible mechanism for computing expression-level
+//! statistics metadata (selectivity, NDV, min/max bounds) following the chain
+//! of responsibility pattern.
+//!
+//! # Overview
+//!
+//! Different expressions have different statistical properties:
+//!
+//! - **Injective functions** (UPPER, LOWER, ABS on non-negative): preserve NDV
+//! - **Non-injective functions** (FLOOR, YEAR, SUBSTRING): reduce NDV
+//! - **Monotonic functions**: allow min/max bound propagation
+//! - **Constants**: NDV = 1, selectivity depends on value
+//!
+//! The default implementation uses classic Selinger-style estimation. Users 
can
+//! register custom [`ExpressionAnalyzer`] implementations to:
+//!
+//! 1. Provide statistics for custom UDFs
+//! 2. Override default estimation with domain-specific knowledge
+//! 3. Plug in advanced approaches (e.g., histogram-based estimation)
+//!
+//! # Example
+//!
+//! ```ignore
+//! use datafusion_physical_plan::expression_analyzer::*;
+//!
+//! // Create registry with default analyzer
+//! let mut registry = ExpressionAnalyzerRegistry::new();
+//!
+//! // Register custom analyzer (higher priority)
+//! registry.register(Arc::new(MyCustomAnalyzer));
+//!
+//! // Query expression statistics
+//! let selectivity = registry.get_selectivity(&predicate, &input_stats);
+//! ```
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
+use datafusion_expr::Operator;
+
+use crate::expressions::{BinaryExpr, Column, Literal, NotExpr};
+use crate::{PhysicalExpr, ScalarFunctionExpr};
+
+// ============================================================================
+// AnalysisResult: Chain of responsibility result type
+// ============================================================================
+
+/// Result of expression analysis - either computed or delegate to next 
analyzer.
+#[derive(Debug, Clone)]
+pub enum AnalysisResult<T> {
+    /// Analysis was performed, here's the result
+    Computed(T),
+    /// This analyzer doesn't handle this expression; delegate to next
+    Delegate,
+}
+
+impl<T> AnalysisResult<T> {
+    /// Convert to Option, returning None for Delegate
+    pub fn into_option(self) -> Option<T> {
+        match self {
+            AnalysisResult::Computed(v) => Some(v),
+            AnalysisResult::Delegate => None,
+        }
+    }
+
+    /// Returns true if this is a Computed result
+    pub fn is_computed(&self) -> bool {
+        matches!(self, AnalysisResult::Computed(_))
+    }
+}
+
+// ============================================================================
+// ExpressionAnalyzer trait
+// ============================================================================
+
+/// Expression-level metadata analysis.
+///
+/// Implementations can handle specific expression types or provide domain
+/// knowledge for custom UDFs. The chain of analyzers is traversed until one
+/// returns [`AnalysisResult::Computed`].
+///
+/// # Implementing a Custom Analyzer
+///
+/// ```ignore
+/// #[derive(Debug)]
+/// struct MyUdfAnalyzer;
+///
+/// impl ExpressionAnalyzer for MyUdfAnalyzer {
+///     fn get_selectivity(
+///         &self,
+///         expr: &Arc<dyn PhysicalExpr>,
+///         input_stats: &Statistics,
+///     ) -> AnalysisResult<f64> {
+///         // Recognize my custom is_valid_email() UDF
+///         if is_my_email_validator(expr) {
+///             return AnalysisResult::Computed(0.8); // ~80% valid
+///         }
+///         AnalysisResult::Delegate
+///     }
+/// }
+/// ```
+pub trait ExpressionAnalyzer: Debug + Send + Sync {
+    /// Estimate selectivity when this expression is used as a predicate.
+    ///
+    /// Returns a value in [0.0, 1.0] representing the fraction of rows
+    /// that satisfy the predicate.
+    fn get_selectivity(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<f64> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate the number of distinct values in the expression's output.
+    ///
+    /// Properties:
+    /// - Injective functions preserve input NDV
+    /// - Non-injective functions reduce NDV (e.g., FLOOR, YEAR)
+    /// - Constants have NDV = 1
+    fn get_distinct_count(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<usize> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate min/max bounds of the expression's output.
+    ///
+    /// Monotonic functions can transform input bounds:
+    /// - Increasing: (f(min), f(max))
+    /// - Decreasing: (f(max), f(min))
+    /// - Non-monotonic: may need wider bounds or return Delegate
+    fn get_min_max(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<(ScalarValue, ScalarValue)> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate the fraction of null values in the expression's output.
+    ///
+    /// Returns a value in [0.0, 1.0].
+    fn get_null_fraction(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<f64> {
+        AnalysisResult::Delegate
+    }
+}
+
+// ============================================================================
+// ExpressionAnalyzerRegistry
+// ============================================================================
+
+/// Registry that chains [`ExpressionAnalyzer`] implementations.
+///
+/// Analyzers are tried in order; the first to return 
[`AnalysisResult::Computed`]
+/// wins. Register domain-specific analyzers before the default for override.
+#[derive(Debug, Clone)]
+pub struct ExpressionAnalyzerRegistry {
+    analyzers: Vec<Arc<dyn ExpressionAnalyzer>>,
+}
+
+impl Default for ExpressionAnalyzerRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ExpressionAnalyzerRegistry {
+    /// Create a new registry with the default expression analyzer.
+    pub fn new() -> Self {
+        Self {
+            analyzers: vec![Arc::new(DefaultExpressionAnalyzer)],
+        }
+    }
+
+    /// Create a registry with all built-in analyzers (string, math, datetime, 
default).
+    pub fn with_builtin_analyzers() -> Self {

Review Comment:
   nit: When called in projection.rs this is doing vec + 4 arc allocations. I 
think we can pass this in through sessioncontext to avoid initializing for 
every expression



##########
datafusion/datasource-parquet/src/metadata.rs:
##########
@@ -541,6 +541,36 @@ fn summarize_min_max_null_counts(
     )
     .map(|(idx, _)| idx);
 
+    // Extract distinct counts from row group column statistics
+    accumulators.distinct_counts_array[logical_schema_index] =
+        if let Some(parquet_idx) = parquet_index {
+            let distinct_counts: Vec<u64> = row_groups_metadata
+                .iter()
+                .filter_map(|rg| {
+                    rg.columns()
+                        .get(parquet_idx)
+                        .and_then(|col| col.statistics())
+                        .and_then(|stats| stats.distinct_count_opt())
+                })
+                .collect();
+
+            if distinct_counts.is_empty() {
+                Precision::Absent
+            } else if distinct_counts.len() == 1 {

Review Comment:
   If we are merging multiple row groups and only one of them have NDV, then it 
will trigger this path and assign that distinct_count to statistics despite 
having many unknown NDVs. 
   
   There are two scenarios here:
   
   large fraction of ndvs missing from row groups -> return Absent
   only a small fraction NDVs missing from row groups -> return inexact
   
   What the fraction should be is debatable, maybe like 1/4 or something



##########
datafusion/physical-expr/src/expression_analyzer.rs:
##########
@@ -0,0 +1,1222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable expression-level statistics analysis.
+//!
+//! This module provides an extensible mechanism for computing expression-level
+//! statistics metadata (selectivity, NDV, min/max bounds) following the chain
+//! of responsibility pattern.
+//!
+//! # Overview
+//!
+//! Different expressions have different statistical properties:
+//!
+//! - **Injective functions** (UPPER, LOWER, ABS on non-negative): preserve NDV
+//! - **Non-injective functions** (FLOOR, YEAR, SUBSTRING): reduce NDV
+//! - **Monotonic functions**: allow min/max bound propagation
+//! - **Constants**: NDV = 1, selectivity depends on value
+//!
+//! The default implementation uses classic Selinger-style estimation. Users 
can
+//! register custom [`ExpressionAnalyzer`] implementations to:
+//!
+//! 1. Provide statistics for custom UDFs
+//! 2. Override default estimation with domain-specific knowledge
+//! 3. Plug in advanced approaches (e.g., histogram-based estimation)
+//!
+//! # Example
+//!
+//! ```ignore
+//! use datafusion_physical_plan::expression_analyzer::*;
+//!
+//! // Create registry with default analyzer
+//! let mut registry = ExpressionAnalyzerRegistry::new();
+//!
+//! // Register custom analyzer (higher priority)
+//! registry.register(Arc::new(MyCustomAnalyzer));
+//!
+//! // Query expression statistics
+//! let selectivity = registry.get_selectivity(&predicate, &input_stats);
+//! ```
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
+use datafusion_expr::Operator;
+
+use crate::expressions::{BinaryExpr, Column, Literal, NotExpr};
+use crate::{PhysicalExpr, ScalarFunctionExpr};
+
+// ============================================================================
+// AnalysisResult: Chain of responsibility result type
+// ============================================================================
+
+/// Result of expression analysis - either computed or delegate to next 
analyzer.
+#[derive(Debug, Clone)]
+pub enum AnalysisResult<T> {
+    /// Analysis was performed, here's the result
+    Computed(T),
+    /// This analyzer doesn't handle this expression; delegate to next
+    Delegate,
+}
+
+impl<T> AnalysisResult<T> {
+    /// Convert to Option, returning None for Delegate
+    pub fn into_option(self) -> Option<T> {
+        match self {
+            AnalysisResult::Computed(v) => Some(v),
+            AnalysisResult::Delegate => None,
+        }
+    }
+
+    /// Returns true if this is a Computed result
+    pub fn is_computed(&self) -> bool {
+        matches!(self, AnalysisResult::Computed(_))
+    }
+}
+
+// ============================================================================
+// ExpressionAnalyzer trait
+// ============================================================================
+
+/// Expression-level metadata analysis.
+///
+/// Implementations can handle specific expression types or provide domain
+/// knowledge for custom UDFs. The chain of analyzers is traversed until one
+/// returns [`AnalysisResult::Computed`].
+///
+/// # Implementing a Custom Analyzer
+///
+/// ```ignore
+/// #[derive(Debug)]
+/// struct MyUdfAnalyzer;
+///
+/// impl ExpressionAnalyzer for MyUdfAnalyzer {
+///     fn get_selectivity(
+///         &self,
+///         expr: &Arc<dyn PhysicalExpr>,
+///         input_stats: &Statistics,
+///     ) -> AnalysisResult<f64> {
+///         // Recognize my custom is_valid_email() UDF
+///         if is_my_email_validator(expr) {
+///             return AnalysisResult::Computed(0.8); // ~80% valid
+///         }
+///         AnalysisResult::Delegate
+///     }
+/// }
+/// ```
+pub trait ExpressionAnalyzer: Debug + Send + Sync {
+    /// Estimate selectivity when this expression is used as a predicate.
+    ///
+    /// Returns a value in [0.0, 1.0] representing the fraction of rows
+    /// that satisfy the predicate.
+    fn get_selectivity(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<f64> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate the number of distinct values in the expression's output.
+    ///
+    /// Properties:
+    /// - Injective functions preserve input NDV
+    /// - Non-injective functions reduce NDV (e.g., FLOOR, YEAR)
+    /// - Constants have NDV = 1
+    fn get_distinct_count(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<usize> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate min/max bounds of the expression's output.
+    ///
+    /// Monotonic functions can transform input bounds:
+    /// - Increasing: (f(min), f(max))
+    /// - Decreasing: (f(max), f(min))
+    /// - Non-monotonic: may need wider bounds or return Delegate
+    fn get_min_max(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<(ScalarValue, ScalarValue)> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate the fraction of null values in the expression's output.
+    ///
+    /// Returns a value in [0.0, 1.0].
+    fn get_null_fraction(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<f64> {
+        AnalysisResult::Delegate
+    }
+}
+
+// ============================================================================
+// ExpressionAnalyzerRegistry
+// ============================================================================
+
+/// Registry that chains [`ExpressionAnalyzer`] implementations.
+///
+/// Analyzers are tried in order; the first to return 
[`AnalysisResult::Computed`]
+/// wins. Register domain-specific analyzers before the default for override.
+#[derive(Debug, Clone)]
+pub struct ExpressionAnalyzerRegistry {
+    analyzers: Vec<Arc<dyn ExpressionAnalyzer>>,
+}
+
+impl Default for ExpressionAnalyzerRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ExpressionAnalyzerRegistry {
+    /// Create a new registry with the default expression analyzer.
+    pub fn new() -> Self {
+        Self {
+            analyzers: vec![Arc::new(DefaultExpressionAnalyzer)],
+        }
+    }
+
+    /// Create a registry with all built-in analyzers (string, math, datetime, 
default).
+    pub fn with_builtin_analyzers() -> Self {
+        Self {
+            analyzers: vec![
+                Arc::new(StringFunctionAnalyzer),
+                Arc::new(MathFunctionAnalyzer),
+                Arc::new(DateTimeFunctionAnalyzer),
+                Arc::new(DefaultExpressionAnalyzer),
+            ],
+        }
+    }
+
+    /// Create a registry with custom analyzers (no default).
+    pub fn with_analyzers(analyzers: Vec<Arc<dyn ExpressionAnalyzer>>) -> Self 
{
+        Self { analyzers }
+    }
+
+    /// Create a registry with custom analyzers plus default as fallback.
+    pub fn with_analyzers_and_default(
+        analyzers: impl IntoIterator<Item = Arc<dyn ExpressionAnalyzer>>,
+    ) -> Self {
+        let mut all: Vec<Arc<dyn ExpressionAnalyzer>> = 
analyzers.into_iter().collect();
+        all.push(Arc::new(DefaultExpressionAnalyzer));
+        Self { analyzers: all }
+    }
+
+    /// Register an analyzer at the front of the chain (higher priority).
+    pub fn register(&mut self, analyzer: Arc<dyn ExpressionAnalyzer>) {
+        self.analyzers.insert(0, analyzer);
+    }
+
+    /// Get selectivity through the analyzer chain.
+    pub fn get_selectivity(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> Option<f64> {
+        for analyzer in &self.analyzers {
+            if let AnalysisResult::Computed(sel) =
+                analyzer.get_selectivity(expr, input_stats)
+            {
+                return Some(sel);
+            }
+        }
+        None
+    }
+
+    /// Get distinct count through the analyzer chain.
+    pub fn get_distinct_count(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> Option<usize> {
+        for analyzer in &self.analyzers {
+            if let AnalysisResult::Computed(ndv) =
+                analyzer.get_distinct_count(expr, input_stats)
+            {
+                return Some(ndv);
+            }
+        }
+        None
+    }
+
+    /// Get min/max bounds through the analyzer chain.
+    pub fn get_min_max(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> Option<(ScalarValue, ScalarValue)> {
+        for analyzer in &self.analyzers {
+            if let AnalysisResult::Computed(bounds) =
+                analyzer.get_min_max(expr, input_stats)
+            {
+                return Some(bounds);
+            }
+        }
+        None
+    }
+
+    /// Get null fraction through the analyzer chain.
+    pub fn get_null_fraction(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> Option<f64> {
+        for analyzer in &self.analyzers {
+            if let AnalysisResult::Computed(frac) =
+                analyzer.get_null_fraction(expr, input_stats)
+            {
+                return Some(frac);
+            }
+        }
+        None
+    }
+}
+
+// ============================================================================
+// DefaultExpressionAnalyzer
+// ============================================================================
+
+/// Default expression analyzer with Selinger-style estimation.
+///
+/// Handles common expression types:
+/// - Column references (uses column statistics)
+/// - Binary expressions (AND, OR, comparison operators)
+/// - Literals (constant selectivity/NDV)
+/// - NOT expressions (1 - child selectivity)
+#[derive(Debug, Default, Clone)]
+pub struct DefaultExpressionAnalyzer;
+
+impl DefaultExpressionAnalyzer {
+    /// Get column index from a Column expression
+    fn get_column_index(expr: &Arc<dyn PhysicalExpr>) -> Option<usize> {
+        expr.as_any().downcast_ref::<Column>().map(|c| c.index())
+    }
+
+    /// Get column statistics for an expression if it's a column reference
+    fn get_column_stats<'a>(
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &'a Statistics,
+    ) -> Option<&'a ColumnStatistics> {
+        Self::get_column_index(expr)
+            .and_then(|idx| input_stats.column_statistics.get(idx))
+    }
+
+    /// Recursive selectivity estimation
+    fn estimate_selectivity_recursive(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> f64 {
+        if let AnalysisResult::Computed(sel) = self.get_selectivity(expr, 
input_stats) {
+            return sel;
+        }
+        0.5 // Default fallback
+    }
+}
+
+impl ExpressionAnalyzer for DefaultExpressionAnalyzer {
+    fn get_selectivity(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> AnalysisResult<f64> {
+        // Binary expressions: AND, OR, comparisons
+        if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+            let left_sel =
+                self.estimate_selectivity_recursive(binary.left(), 
input_stats);
+            let right_sel =
+                self.estimate_selectivity_recursive(binary.right(), 
input_stats);
+
+            let sel = match binary.op() {
+                // Logical operators
+                Operator::And => left_sel * right_sel,
+                Operator::Or => left_sel + right_sel - (left_sel * right_sel),
+
+                // Equality: selectivity = 1/NDV
+                Operator::Eq => {
+                    if let Some(ndv) = Self::get_column_stats(binary.left(), 
input_stats)
+                        .and_then(|s| s.distinct_count.get_value())
+                        .filter(|&&ndv| ndv > 0)
+                    {
+                        return AnalysisResult::Computed(1.0 / (*ndv as f64));
+                    }
+                    0.1 // Default equality selectivity
+                }
+
+                // Inequality: selectivity = 1 - 1/NDV
+                Operator::NotEq => {
+                    if let Some(ndv) = Self::get_column_stats(binary.left(), 
input_stats)
+                        .and_then(|s| s.distinct_count.get_value())
+                        .filter(|&&ndv| ndv > 0)
+                    {
+                        return AnalysisResult::Computed(1.0 - (1.0 / (*ndv as 
f64)));
+                    }
+                    0.9
+                }
+
+                // Range predicates: classic 1/3 estimate
+                Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq 
=> 0.33,
+
+                // LIKE: depends on pattern, use conservative estimate
+                Operator::LikeMatch | Operator::ILikeMatch => 0.25,
+                Operator::NotLikeMatch | Operator::NotILikeMatch => 0.75,
+
+                // Other operators: default
+                _ => 0.5,
+            };
+
+            return AnalysisResult::Computed(sel);
+        }
+
+        // NOT expression: 1 - child selectivity
+        if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
+            let child_sel =
+                self.estimate_selectivity_recursive(not_expr.arg(), 
input_stats);
+            return AnalysisResult::Computed(1.0 - child_sel);
+        }
+
+        // Literal boolean: 0.0 or 1.0
+        if let Some(b) = expr
+            .as_any()
+            .downcast_ref::<Literal>()
+            .and_then(|lit| match lit.value() {
+                ScalarValue::Boolean(Some(b)) => Some(*b),
+                _ => None,
+            })
+        {
+            return AnalysisResult::Computed(if b { 1.0 } else { 0.0 });
+        }
+
+        // Column reference as predicate (boolean column)
+        if expr.as_any().downcast_ref::<Column>().is_some() {
+            return AnalysisResult::Computed(0.5);
+        }
+
+        AnalysisResult::Delegate
+    }
+
+    fn get_distinct_count(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> AnalysisResult<usize> {
+        // Column reference: use column NDV
+        if let Some(ndv) = Self::get_column_stats(expr, input_stats)
+            .and_then(|col_stats| 
col_stats.distinct_count.get_value().copied())
+        {
+            return AnalysisResult::Computed(ndv);
+        }
+
+        // Literal: NDV = 1
+        if expr.as_any().downcast_ref::<Literal>().is_some() {
+            return AnalysisResult::Computed(1);
+        }
+
+        // BinaryExpr: for injective operations (arithmetic with literal), 
preserve NDV
+        if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+            let is_arithmetic = matches!(
+                binary.op(),
+                Operator::Plus
+                    | Operator::Minus
+                    | Operator::Multiply
+                    | Operator::Divide
+                    | Operator::Modulo
+            );
+
+            if is_arithmetic {
+                // If one side is a literal, the operation is injective on the 
other side
+                let left_is_literal = binary.left().as_any().is::<Literal>();
+                let right_is_literal = binary.right().as_any().is::<Literal>();
+
+                if left_is_literal {
+                    // NDV comes from right side
+                    if let AnalysisResult::Computed(ndv) =
+                        self.get_distinct_count(binary.right(), input_stats)
+                    {
+                        return AnalysisResult::Computed(ndv);
+                    }
+                } else if right_is_literal {
+                    // NDV comes from left side
+                    if let AnalysisResult::Computed(ndv) =
+                        self.get_distinct_count(binary.left(), input_stats)
+                    {
+                        return AnalysisResult::Computed(ndv);
+                    }
+                }
+                // Both sides are non-literals: could combine, but delegate 
for now
+            }
+        }
+
+        AnalysisResult::Delegate
+    }
+
+    fn get_min_max(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> AnalysisResult<(ScalarValue, ScalarValue)> {
+        // Column reference: use column min/max
+        if let Some((min, max)) =
+            Self::get_column_stats(expr, input_stats).and_then(|col_stats| {
+                match (
+                    col_stats.min_value.get_value(),
+                    col_stats.max_value.get_value(),
+                ) {
+                    (Some(min), Some(max)) => Some((min.clone(), max.clone())),
+                    _ => None,
+                }
+            })
+        {
+            return AnalysisResult::Computed((min, max));
+        }
+
+        // Literal: min = max = value
+        if let Some(lit_expr) = expr.as_any().downcast_ref::<Literal>() {
+            let val = lit_expr.value().clone();
+            return AnalysisResult::Computed((val.clone(), val));
+        }
+
+        AnalysisResult::Delegate
+    }
+
+    fn get_null_fraction(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> AnalysisResult<f64> {
+        // Column reference: null_count / num_rows
+        if let Some(fraction) =
+            Self::get_column_stats(expr, input_stats).and_then(|col_stats| {
+                let null_count = col_stats.null_count.get_value().copied()?;
+                let num_rows = input_stats.num_rows.get_value().copied()?;
+                if num_rows > 0 {
+                    Some(null_count as f64 / num_rows as f64)
+                } else {
+                    None
+                }
+            })
+        {
+            return AnalysisResult::Computed(fraction);
+        }
+
+        // Literal: null fraction depends on whether it's null
+        if let Some(lit_expr) = expr.as_any().downcast_ref::<Literal>() {
+            let is_null = lit_expr.value().is_null();
+            return AnalysisResult::Computed(if is_null { 1.0 } else { 0.0 });
+        }
+
+        AnalysisResult::Delegate
+    }
+}
+
+// ============================================================================
+// StringFunctionAnalyzer
+// ============================================================================
+
+/// Analyzer for string functions.
+///
+/// - Injective (preserve NDV): UPPER, LOWER, TRIM, LTRIM, RTRIM, REVERSE
+/// - Non-injective (reduce NDV): SUBSTRING, LEFT, RIGHT, REPLACE
+#[derive(Debug, Default, Clone)]
+pub struct StringFunctionAnalyzer;
+
+impl StringFunctionAnalyzer {
+    /// Check if a function is injective (one-to-one)
+    pub fn is_injective(func_name: &str) -> bool {
+        matches!(
+            func_name.to_uppercase().as_str(),
+            "UPPER" | "LOWER" | "TRIM" | "LTRIM" | "RTRIM" | "REVERSE" | 
"INITCAP"
+        )
+    }
+
+    /// Get NDV reduction factor for non-injective functions
+    pub fn ndv_reduction_factor(func_name: &str) -> Option<f64> {
+        match func_name.to_uppercase().as_str() {
+            "SUBSTRING" | "LEFT" | "RIGHT" => Some(0.5),
+            "REPLACE" => Some(0.8),
+            _ => None,
+        }
+    }
+}
+
+impl ExpressionAnalyzer for StringFunctionAnalyzer {
+    fn get_distinct_count(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> AnalysisResult<usize> {
+        let Some(func) = expr.as_any().downcast_ref::<ScalarFunctionExpr>() 
else {
+            return AnalysisResult::Delegate;
+        };
+
+        let func_name = func.name();
+        let Some(first_arg) = func.args().first() else {
+            return AnalysisResult::Delegate;
+        };
+
+        // Get input NDV
+        let Some(input_ndv) = DefaultExpressionAnalyzer

Review Comment:
   For this API using DefaultExpressionAnalyzer here might not be what we want
   
   If i had `impl ExpressionAnalyzer for InsertNewNDV` the problem is that I 
would have to handle all the cases with different column functions (ex. UPPER, 
LOWER, etc.), leading to a lot of duplicate functionality. Instead what if we 
pass in the list of analyzers here is that instead of 
`DefaultExpressionAnalyzer.get_distinct_count` we can call 
`analyzer_list[0].get_distinct_count` or something. 
   
   So if we add a custom analyzer to change the input NDV with our own 
statistics (ex. histogram), we do not need to reimplement the other 
functionality



##########
datafusion/physical-expr/src/expression_analyzer.rs:
##########
@@ -0,0 +1,1222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable expression-level statistics analysis.
+//!
+//! This module provides an extensible mechanism for computing expression-level
+//! statistics metadata (selectivity, NDV, min/max bounds) following the chain
+//! of responsibility pattern.
+//!
+//! # Overview
+//!
+//! Different expressions have different statistical properties:
+//!
+//! - **Injective functions** (UPPER, LOWER, ABS on non-negative): preserve NDV
+//! - **Non-injective functions** (FLOOR, YEAR, SUBSTRING): reduce NDV
+//! - **Monotonic functions**: allow min/max bound propagation
+//! - **Constants**: NDV = 1, selectivity depends on value
+//!
+//! The default implementation uses classic Selinger-style estimation. Users 
can
+//! register custom [`ExpressionAnalyzer`] implementations to:
+//!
+//! 1. Provide statistics for custom UDFs
+//! 2. Override default estimation with domain-specific knowledge
+//! 3. Plug in advanced approaches (e.g., histogram-based estimation)
+//!
+//! # Example
+//!
+//! ```ignore
+//! use datafusion_physical_plan::expression_analyzer::*;
+//!
+//! // Create registry with default analyzer
+//! let mut registry = ExpressionAnalyzerRegistry::new();
+//!
+//! // Register custom analyzer (higher priority)
+//! registry.register(Arc::new(MyCustomAnalyzer));
+//!
+//! // Query expression statistics
+//! let selectivity = registry.get_selectivity(&predicate, &input_stats);
+//! ```
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
+use datafusion_expr::Operator;
+
+use crate::expressions::{BinaryExpr, Column, Literal, NotExpr};
+use crate::{PhysicalExpr, ScalarFunctionExpr};
+
+// ============================================================================
+// AnalysisResult: Chain of responsibility result type
+// ============================================================================
+
+/// Result of expression analysis - either computed or delegate to next 
analyzer.
+#[derive(Debug, Clone)]
+pub enum AnalysisResult<T> {
+    /// Analysis was performed, here's the result
+    Computed(T),
+    /// This analyzer doesn't handle this expression; delegate to next
+    Delegate,
+}
+
+impl<T> AnalysisResult<T> {
+    /// Convert to Option, returning None for Delegate
+    pub fn into_option(self) -> Option<T> {
+        match self {
+            AnalysisResult::Computed(v) => Some(v),
+            AnalysisResult::Delegate => None,
+        }
+    }
+
+    /// Returns true if this is a Computed result
+    pub fn is_computed(&self) -> bool {
+        matches!(self, AnalysisResult::Computed(_))
+    }
+}
+
+// ============================================================================
+// ExpressionAnalyzer trait
+// ============================================================================
+
+/// Expression-level metadata analysis.
+///
+/// Implementations can handle specific expression types or provide domain
+/// knowledge for custom UDFs. The chain of analyzers is traversed until one
+/// returns [`AnalysisResult::Computed`].
+///
+/// # Implementing a Custom Analyzer
+///
+/// ```ignore
+/// #[derive(Debug)]
+/// struct MyUdfAnalyzer;
+///
+/// impl ExpressionAnalyzer for MyUdfAnalyzer {
+///     fn get_selectivity(
+///         &self,
+///         expr: &Arc<dyn PhysicalExpr>,
+///         input_stats: &Statistics,
+///     ) -> AnalysisResult<f64> {
+///         // Recognize my custom is_valid_email() UDF
+///         if is_my_email_validator(expr) {
+///             return AnalysisResult::Computed(0.8); // ~80% valid
+///         }
+///         AnalysisResult::Delegate
+///     }
+/// }
+/// ```
+pub trait ExpressionAnalyzer: Debug + Send + Sync {
+    /// Estimate selectivity when this expression is used as a predicate.
+    ///
+    /// Returns a value in [0.0, 1.0] representing the fraction of rows
+    /// that satisfy the predicate.
+    fn get_selectivity(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<f64> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate the number of distinct values in the expression's output.
+    ///
+    /// Properties:
+    /// - Injective functions preserve input NDV
+    /// - Non-injective functions reduce NDV (e.g., FLOOR, YEAR)
+    /// - Constants have NDV = 1
+    fn get_distinct_count(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<usize> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate min/max bounds of the expression's output.
+    ///
+    /// Monotonic functions can transform input bounds:
+    /// - Increasing: (f(min), f(max))
+    /// - Decreasing: (f(max), f(min))
+    /// - Non-monotonic: may need wider bounds or return Delegate
+    fn get_min_max(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<(ScalarValue, ScalarValue)> {
+        AnalysisResult::Delegate
+    }
+
+    /// Estimate the fraction of null values in the expression's output.
+    ///
+    /// Returns a value in [0.0, 1.0].
+    fn get_null_fraction(
+        &self,
+        _expr: &Arc<dyn PhysicalExpr>,
+        _input_stats: &Statistics,
+    ) -> AnalysisResult<f64> {
+        AnalysisResult::Delegate
+    }
+}
+
+// ============================================================================
+// ExpressionAnalyzerRegistry
+// ============================================================================
+
+/// Registry that chains [`ExpressionAnalyzer`] implementations.
+///
+/// Analyzers are tried in order; the first to return 
[`AnalysisResult::Computed`]
+/// wins. Register domain-specific analyzers before the default for override.
+#[derive(Debug, Clone)]
+pub struct ExpressionAnalyzerRegistry {
+    analyzers: Vec<Arc<dyn ExpressionAnalyzer>>,
+}
+
+impl Default for ExpressionAnalyzerRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ExpressionAnalyzerRegistry {
+    /// Create a new registry with the default expression analyzer.
+    pub fn new() -> Self {
+        Self {
+            analyzers: vec![Arc::new(DefaultExpressionAnalyzer)],
+        }
+    }
+
+    /// Create a registry with all built-in analyzers (string, math, datetime, 
default).
+    pub fn with_builtin_analyzers() -> Self {
+        Self {
+            analyzers: vec![
+                Arc::new(StringFunctionAnalyzer),
+                Arc::new(MathFunctionAnalyzer),
+                Arc::new(DateTimeFunctionAnalyzer),
+                Arc::new(DefaultExpressionAnalyzer),
+            ],
+        }
+    }
+
+    /// Create a registry with custom analyzers (no default).
+    pub fn with_analyzers(analyzers: Vec<Arc<dyn ExpressionAnalyzer>>) -> Self 
{
+        Self { analyzers }
+    }
+
+    /// Create a registry with custom analyzers plus default as fallback.
+    pub fn with_analyzers_and_default(
+        analyzers: impl IntoIterator<Item = Arc<dyn ExpressionAnalyzer>>,
+    ) -> Self {
+        let mut all: Vec<Arc<dyn ExpressionAnalyzer>> = 
analyzers.into_iter().collect();
+        all.push(Arc::new(DefaultExpressionAnalyzer));
+        Self { analyzers: all }
+    }
+
+    /// Register an analyzer at the front of the chain (higher priority).
+    pub fn register(&mut self, analyzer: Arc<dyn ExpressionAnalyzer>) {
+        self.analyzers.insert(0, analyzer);
+    }
+
+    /// Get selectivity through the analyzer chain.
+    pub fn get_selectivity(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> Option<f64> {
+        for analyzer in &self.analyzers {
+            if let AnalysisResult::Computed(sel) =
+                analyzer.get_selectivity(expr, input_stats)
+            {
+                return Some(sel);
+            }
+        }
+        None
+    }
+
+    /// Get distinct count through the analyzer chain.
+    pub fn get_distinct_count(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> Option<usize> {
+        for analyzer in &self.analyzers {
+            if let AnalysisResult::Computed(ndv) =
+                analyzer.get_distinct_count(expr, input_stats)
+            {
+                return Some(ndv);
+            }
+        }
+        None
+    }
+
+    /// Get min/max bounds through the analyzer chain.
+    pub fn get_min_max(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> Option<(ScalarValue, ScalarValue)> {
+        for analyzer in &self.analyzers {
+            if let AnalysisResult::Computed(bounds) =
+                analyzer.get_min_max(expr, input_stats)
+            {
+                return Some(bounds);
+            }
+        }
+        None
+    }
+
+    /// Get null fraction through the analyzer chain.
+    pub fn get_null_fraction(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> Option<f64> {
+        for analyzer in &self.analyzers {
+            if let AnalysisResult::Computed(frac) =
+                analyzer.get_null_fraction(expr, input_stats)
+            {
+                return Some(frac);
+            }
+        }
+        None
+    }
+}
+
+// ============================================================================
+// DefaultExpressionAnalyzer
+// ============================================================================
+
+/// Default expression analyzer with Selinger-style estimation.
+///
+/// Handles common expression types:
+/// - Column references (uses column statistics)
+/// - Binary expressions (AND, OR, comparison operators)
+/// - Literals (constant selectivity/NDV)
+/// - NOT expressions (1 - child selectivity)
+#[derive(Debug, Default, Clone)]
+pub struct DefaultExpressionAnalyzer;
+
+impl DefaultExpressionAnalyzer {
+    /// Get column index from a Column expression
+    fn get_column_index(expr: &Arc<dyn PhysicalExpr>) -> Option<usize> {
+        expr.as_any().downcast_ref::<Column>().map(|c| c.index())
+    }
+
+    /// Get column statistics for an expression if it's a column reference
+    fn get_column_stats<'a>(
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &'a Statistics,
+    ) -> Option<&'a ColumnStatistics> {
+        Self::get_column_index(expr)
+            .and_then(|idx| input_stats.column_statistics.get(idx))
+    }
+
+    /// Recursive selectivity estimation
+    fn estimate_selectivity_recursive(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+        input_stats: &Statistics,
+    ) -> f64 {
+        if let AnalysisResult::Computed(sel) = self.get_selectivity(expr, 
input_stats) {
+            return sel;
+        }
+        0.5 // Default fallback
+    }
+}
+
+impl ExpressionAnalyzer for DefaultExpressionAnalyzer {

Review Comment:
   Maybe we could move all of this to its own folder later on to declutter. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to