This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new daa8f52c16 fix: interval analysis error when have two filterexec that 
inner filter proves zero selectivity (#20743)
daa8f52c16 is described below

commit daa8f52c1684c5f0c9629763309a0bfc401e6129
Author: Huaijin <[email protected]>
AuthorDate: Tue Mar 10 23:54:14 2026 +0800

    fix: interval analysis error when have two filterexec that inner filter 
proves zero selectivity (#20743)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes https://github.com/apache/datafusion/issues/20742
    
    ## Rationale for this change
    
    - see https://github.com/apache/datafusion/issues/20742
    
    ## What changes are included in this PR?
    
    In `collect_new_statistics`, when a filter proves no rows can match, use
    a typed null (e.g., ScalarValue::Int32(None)) instead of untyped
    ScalarValue::Null for column min/max/sum values. The column's data type
    is looked up from the schema so that downstream interval analysis can
    still intersect intervals of the same type.
    
    ## Are these changes tested?
    
    add one test case
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 .../physical_optimizer/partition_statistics.rs     | 24 +++---
 datafusion/physical-plan/src/filter.rs             | 91 +++++++++++++++++++---
 2 files changed, 93 insertions(+), 22 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index e4b1f1b261..12ce141b47 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -390,17 +390,17 @@ mod test {
             column_statistics: vec![
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Int32(None)),
+                    min_value: Precision::Exact(ScalarValue::Int32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
                     distinct_count: Precision::Exact(0),
                     byte_size: Precision::Exact(16),
                 },
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Date32(None)),
+                    min_value: Precision::Exact(ScalarValue::Date32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Date32(None)),
                     distinct_count: Precision::Exact(0),
                     byte_size: Precision::Exact(16), // 4 rows * 4 bytes 
(Date32)
                 },
@@ -419,17 +419,17 @@ mod test {
             column_statistics: vec![
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Int32(None)),
+                    min_value: Precision::Exact(ScalarValue::Int32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
                     distinct_count: Precision::Exact(0),
                     byte_size: Precision::Exact(8),
                 },
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Date32(None)),
+                    min_value: Precision::Exact(ScalarValue::Date32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Date32(None)),
                     distinct_count: Precision::Exact(0),
                     byte_size: Precision::Exact(8), // 2 rows * 4 bytes 
(Date32)
                 },
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 1e6b4e3193..c8b37a247c 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -338,6 +338,7 @@ impl FilterExec {
         let total_byte_size = 
total_byte_size.with_estimated_selectivity(selectivity);
 
         let column_statistics = collect_new_statistics(
+            schema,
             &input_stats.column_statistics,
             analysis_ctx.boundaries,
         );
@@ -785,6 +786,7 @@ fn interval_bound_to_precision(
 /// is adjusted by using the next/previous value for its data type to convert
 /// it into a closed bound.
 fn collect_new_statistics(
+    schema: &SchemaRef,
     input_column_stats: &[ColumnStatistics],
     analysis_boundaries: Vec<ExprBoundaries>,
 ) -> Vec<ColumnStatistics> {
@@ -801,12 +803,17 @@ fn collect_new_statistics(
                 },
             )| {
                 let Some(interval) = interval else {
-                    // If the interval is `None`, we can say that there are no 
rows:
+                    // If the interval is `None`, we can say that there are no 
rows.
+                    // Use a typed null to preserve the column's data type, so 
that
+                    // downstream interval analysis can still intersect 
intervals
+                    // of the same type.
+                    let typed_null = 
ScalarValue::try_from(schema.field(idx).data_type())
+                        .unwrap_or(ScalarValue::Null);
                     return ColumnStatistics {
                         null_count: Precision::Exact(0),
-                        max_value: Precision::Exact(ScalarValue::Null),
-                        min_value: Precision::Exact(ScalarValue::Null),
-                        sum_value: Precision::Exact(ScalarValue::Null),
+                        max_value: Precision::Exact(typed_null.clone()),
+                        min_value: Precision::Exact(typed_null.clone()),
+                        sum_value: Precision::Exact(typed_null),
                         distinct_count: Precision::Exact(0),
                         byte_size: input_column_stats[idx].byte_size,
                     };
@@ -1497,17 +1504,17 @@ mod tests {
             statistics.column_statistics,
             vec![
                 ColumnStatistics {
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    min_value: Precision::Exact(ScalarValue::Int32(None)),
+                    max_value: Precision::Exact(ScalarValue::Int32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
                     distinct_count: Precision::Exact(0),
                     null_count: Precision::Exact(0),
                     byte_size: Precision::Absent,
                 },
                 ColumnStatistics {
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    min_value: Precision::Exact(ScalarValue::Int32(None)),
+                    max_value: Precision::Exact(ScalarValue::Int32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
                     distinct_count: Precision::Exact(0),
                     null_count: Precision::Exact(0),
                     byte_size: Precision::Absent,
@@ -1518,6 +1525,70 @@ mod tests {
         Ok(())
     }
 
+    /// Regression test: stacking two FilterExecs where the inner filter
+    /// proves zero selectivity should not panic with a type mismatch
+    /// during interval intersection.
+    ///
+    /// Previously, when a filter proved no rows could match, the column
+    /// statistics used untyped `ScalarValue::Null` (data type `Null`).
+    /// If an outer FilterExec then tried to analyze its own predicate
+    /// against those statistics, `Interval::intersect` would fail with:
+    ///   "Only intervals with the same data type are intersectable, lhs:Null, 
rhs:Int32"
+    #[tokio::test]
+    async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
+        // Inner table: a: [1, 100], b: [1, 3]
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Precision::Inexact(1000),
+                total_byte_size: Precision::Inexact(4000),
+                column_statistics: vec![
+                    ColumnStatistics {
+                        min_value: 
Precision::Inexact(ScalarValue::Int32(Some(1))),
+                        max_value: 
Precision::Inexact(ScalarValue::Int32(Some(100))),
+                        ..Default::default()
+                    },
+                    ColumnStatistics {
+                        min_value: 
Precision::Inexact(ScalarValue::Int32(Some(1))),
+                        max_value: 
Precision::Inexact(ScalarValue::Int32(Some(3))),
+                        ..Default::default()
+                    },
+                ],
+            },
+            schema,
+        ));
+
+        // Inner filter: a > 200 (impossible given a max=100 → zero 
selectivity)
+        let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a", 0)),
+            Operator::Gt,
+            Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
+        ));
+        let inner_filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(inner_predicate, input)?);
+
+        // Outer filter: a = 50
+        // Before the fix, this would panic because the inner filter's
+        // zero-selectivity statistics produced Null-typed intervals for
+        // column `a`, which couldn't intersect with the Int32 literal.
+        let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a", 0)),
+            Operator::Eq,
+            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
+        ));
+        let outer_filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
+
+        // Should succeed without error
+        let statistics = outer_filter.partition_statistics(None)?;
+        assert_eq!(statistics.num_rows, Precision::Inexact(0));
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_filter_statistics_more_inputs() -> Result<()> {
         let schema = Schema::new(vec![


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to