This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new 76751f1674 [branch-53] fix: interval analysis error when have two 
filterexec that inner filter proves zero selectivity (#20743) (#20882)
76751f1674 is described below

commit 76751f1674758a61a1517bd1f88e30af0e248843
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 08:48:42 2026 -0400

    [branch-53] fix: interval analysis error when have two filterexec that 
inner filter proves zero selectivity (#20743) (#20882)
    
    - Part of https://github.com/apache/datafusion/issues/19692
    - Closes https://github.com/apache/datafusion/issues/20742 on branch-53
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20743 from
    @haohuaijin to the branch-53 line
    
    Co-authored-by: Huaijin <[email protected]>
---
 .../physical_optimizer/partition_statistics.rs     | 24 +++---
 datafusion/physical-plan/src/filter.rs             | 91 +++++++++++++++++++---
 2 files changed, 93 insertions(+), 22 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index fa021ed3dc..d73db6fe74 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -386,17 +386,17 @@ mod test {
             column_statistics: vec![
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Int32(None)),
+                    min_value: Precision::Exact(ScalarValue::Int32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
                     distinct_count: Precision::Exact(0),
                     byte_size: Precision::Exact(16),
                 },
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Date32(None)),
+                    min_value: Precision::Exact(ScalarValue::Date32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Date32(None)),
                     distinct_count: Precision::Exact(0),
                     byte_size: Precision::Exact(16), // 4 rows * 4 bytes 
(Date32)
                 },
@@ -415,17 +415,17 @@ mod test {
             column_statistics: vec![
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Int32(None)),
+                    min_value: Precision::Exact(ScalarValue::Int32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
                     distinct_count: Precision::Exact(0),
                     byte_size: Precision::Exact(8),
                 },
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    max_value: Precision::Exact(ScalarValue::Date32(None)),
+                    min_value: Precision::Exact(ScalarValue::Date32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Date32(None)),
                     distinct_count: Precision::Exact(0),
                     byte_size: Precision::Exact(8), // 2 rows * 4 bytes 
(Date32)
                 },
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index ecea4e6ebe..af7bcc8e3b 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -337,6 +337,7 @@ impl FilterExec {
         let total_byte_size = 
total_byte_size.with_estimated_selectivity(selectivity);
 
         let column_statistics = collect_new_statistics(
+            schema,
             &input_stats.column_statistics,
             analysis_ctx.boundaries,
         );
@@ -757,6 +758,7 @@ impl EmbeddedProjection for FilterExec {
 /// is adjusted by using the next/previous value for its data type to convert
 /// it into a closed bound.
 fn collect_new_statistics(
+    schema: &SchemaRef,
     input_column_stats: &[ColumnStatistics],
     analysis_boundaries: Vec<ExprBoundaries>,
 ) -> Vec<ColumnStatistics> {
@@ -773,12 +775,17 @@ fn collect_new_statistics(
                 },
             )| {
                 let Some(interval) = interval else {
-                    // If the interval is `None`, we can say that there are no 
rows:
+                    // If the interval is `None`, we can say that there are no 
rows.
+                    // Use a typed null to preserve the column's data type, so 
that
+                    // downstream interval analysis can still intersect 
intervals
+                    // of the same type.
+                    let typed_null = 
ScalarValue::try_from(schema.field(idx).data_type())
+                        .unwrap_or(ScalarValue::Null);
                     return ColumnStatistics {
                         null_count: Precision::Exact(0),
-                        max_value: Precision::Exact(ScalarValue::Null),
-                        min_value: Precision::Exact(ScalarValue::Null),
-                        sum_value: Precision::Exact(ScalarValue::Null),
+                        max_value: Precision::Exact(typed_null.clone()),
+                        min_value: Precision::Exact(typed_null.clone()),
+                        sum_value: Precision::Exact(typed_null),
                         distinct_count: Precision::Exact(0),
                         byte_size: input_column_stats[idx].byte_size,
                     };
@@ -1471,17 +1478,17 @@ mod tests {
             statistics.column_statistics,
             vec![
                 ColumnStatistics {
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    min_value: Precision::Exact(ScalarValue::Int32(None)),
+                    max_value: Precision::Exact(ScalarValue::Int32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
                     distinct_count: Precision::Exact(0),
                     null_count: Precision::Exact(0),
                     byte_size: Precision::Absent,
                 },
                 ColumnStatistics {
-                    min_value: Precision::Exact(ScalarValue::Null),
-                    max_value: Precision::Exact(ScalarValue::Null),
-                    sum_value: Precision::Exact(ScalarValue::Null),
+                    min_value: Precision::Exact(ScalarValue::Int32(None)),
+                    max_value: Precision::Exact(ScalarValue::Int32(None)),
+                    sum_value: Precision::Exact(ScalarValue::Int32(None)),
                     distinct_count: Precision::Exact(0),
                     null_count: Precision::Exact(0),
                     byte_size: Precision::Absent,
@@ -1492,6 +1499,70 @@ mod tests {
         Ok(())
     }
 
+    /// Regression test: stacking two FilterExecs where the inner filter
+    /// proves zero selectivity should not panic with a type mismatch
+    /// during interval intersection.
+    ///
+    /// Previously, when a filter proved no rows could match, the column
+    /// statistics used untyped `ScalarValue::Null` (data type `Null`).
+    /// If an outer FilterExec then tried to analyze its own predicate
+    /// against those statistics, `Interval::intersect` would fail with:
+    ///   "Only intervals with the same data type are intersectable, lhs:Null, 
rhs:Int32"
+    #[tokio::test]
+    async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
+        // Inner table: a: [1, 100], b: [1, 3]
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Precision::Inexact(1000),
+                total_byte_size: Precision::Inexact(4000),
+                column_statistics: vec![
+                    ColumnStatistics {
+                        min_value: 
Precision::Inexact(ScalarValue::Int32(Some(1))),
+                        max_value: 
Precision::Inexact(ScalarValue::Int32(Some(100))),
+                        ..Default::default()
+                    },
+                    ColumnStatistics {
+                        min_value: 
Precision::Inexact(ScalarValue::Int32(Some(1))),
+                        max_value: 
Precision::Inexact(ScalarValue::Int32(Some(3))),
+                        ..Default::default()
+                    },
+                ],
+            },
+            schema,
+        ));
+
+        // Inner filter: a > 200 (impossible given a max=100 → zero 
selectivity)
+        let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a", 0)),
+            Operator::Gt,
+            Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
+        ));
+        let inner_filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(inner_predicate, input)?);
+
+        // Outer filter: a = 50
+        // Before the fix, this would panic because the inner filter's
+        // zero-selectivity statistics produced Null-typed intervals for
+        // column `a`, which couldn't intersect with the Int32 literal.
+        let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a", 0)),
+            Operator::Eq,
+            Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
+        ));
+        let outer_filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
+
+        // Should succeed without error
+        let statistics = outer_filter.partition_statistics(None)?;
+        assert_eq!(statistics.num_rows, Precision::Inexact(0));
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_filter_statistics_more_inputs() -> Result<()> {
         let schema = Schema::new(vec![


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to