This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-22762-71d22b73e8c2c2df4d1edfec3e72fdc446553785
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 3c4034c8d0685bb70e8e94f6a8952d309e3b9c41
Author: Neil Conway <[email protected]>
AuthorDate: Sat Jun 6 01:58:53 2026 -0400

    fix: Scale semi/anti-join column stats by estimated row count (#22762)
    
    ## Which issue does this PR close?
    
    - Closes #22743
    
    ## Rationale for this change
    
    This PR makes several related improvements/fixes to the stats code for
    semi- and anti-joins:
    
    1. Scale per-column stats using the estimated output row count, rather
    than just reusing the stats from the preserved side of the join.
    2. Compute `total_byte_size` for semi/anti-join results, based on
    summing per-column `byte_size`, instead of always emitting `Absent`. We
    still emit absent for other join types and if any of the per-column
    `byte_size` values are `Absent`
    3. Pass in the join's `NullEquality` semantics, and use those for stats:
    under `NullEqualsNothing`, null join keys will never match (so we can
    return `Exact(0)`), whereas under `NullEqualsNull` we consider nulls
    just like any other value.
    
    ## What changes are included in this PR?
    
    * Stats improvements described above
    * Some refactoring and cleanup
    * New unit tests
    * Update test expectations where needed
    
    ## Are these changes tested?
    
    Yes; new tests added.
    
    ## Are there any user-facing changes?
    
    Some queries might get different plans.
---
 .../physical_optimizer/partition_statistics.rs     |  21 +-
 .../physical-plan/src/joins/hash_join/exec.rs      |   3 +
 .../physical-plan/src/joins/nested_loop_join.rs    |   6 +-
 .../src/joins/sort_merge_join/exec.rs              |   1 +
 datafusion/physical-plan/src/joins/utils.rs        | 571 +++++++++++++++++++--
 5 files changed, 548 insertions(+), 54 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index 181b7de7d9..4fba94ec3a 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -661,39 +661,36 @@ mod test {
         let full_statistics = nested_loop_join.partition_statistics(None)?;
         // With empty join columns, estimate_join_statistics returns Inexact 
row count
         // based on the outer side (right side for RightSemi)
-        let mut expected_full_statistics = create_partition_statistics(
+        let expected_full_statistics = create_partition_statistics(
             4,
             32,
             1,
             4,
             Some((DATE_2025_03_01, DATE_2025_03_04)),
-        );
-        expected_full_statistics.num_rows = Precision::Inexact(4);
-        expected_full_statistics.total_byte_size = Precision::Absent;
+        )
+        .to_inexact();
         assert_eq!(*full_statistics, expected_full_statistics);
 
         // Test partition_statistics(Some(idx)) - returns partition-specific 
statistics
         // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
-        let mut expected_statistic_partition_1 = create_partition_statistics(
+        let expected_statistic_partition_1 = create_partition_statistics(
             2,
             16,
             3,
             4,
             Some((DATE_2025_03_01, DATE_2025_03_02)),
-        );
-        expected_statistic_partition_1.num_rows = Precision::Inexact(2);
-        expected_statistic_partition_1.total_byte_size = Precision::Absent;
+        )
+        .to_inexact();
 
         // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
-        let mut expected_statistic_partition_2 = create_partition_statistics(
+        let expected_statistic_partition_2 = create_partition_statistics(
             2,
             16,
             1,
             2,
             Some((DATE_2025_03_03, DATE_2025_03_04)),
-        );
-        expected_statistic_partition_2.num_rows = Precision::Inexact(2);
-        expected_statistic_partition_2.total_byte_size = Precision::Absent;
+        )
+        .to_inexact();
 
         let statistics = 
(0..nested_loop_join.output_partitioning().partition_count())
             .map(|idx| nested_loop_join.partition_statistics(Some(idx)))
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 03387c316b..3774a30020 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -1448,6 +1448,7 @@ impl ExecutionPlan for HashJoinExec {
                     Arc::unwrap_or_clone(left_stats),
                     Arc::unwrap_or_clone(right_stats),
                     &self.on,
+                    self.null_equality,
                     &self.join_type,
                     &self.join_schema,
                 )?
@@ -1463,6 +1464,7 @@ impl ExecutionPlan for HashJoinExec {
                     Arc::unwrap_or_clone(left_stats),
                     Arc::unwrap_or_clone(right_stats),
                     &self.on,
+                    self.null_equality,
                     &self.join_type,
                     &self.join_schema,
                 )?
@@ -1480,6 +1482,7 @@ impl ExecutionPlan for HashJoinExec {
                     Arc::unwrap_or_clone(left_stats),
                     Arc::unwrap_or_clone(right_stats),
                     &self.on,
+                    self.null_equality,
                     &self.join_type,
                     &self.join_schema,
                 )?
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 15af23b447..a18ec0cbe4 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -61,8 +61,9 @@ use arrow::record_batch::RecordBatch;
 use arrow_schema::DataType;
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{
-    JoinSide, Result, ScalarValue, Statistics, arrow_err, 
assert_eq_or_internal_err,
-    internal_datafusion_err, internal_err, project_schema, 
unwrap_or_internal_err,
+    JoinSide, NullEquality, Result, ScalarValue, Statistics, arrow_err,
+    assert_eq_or_internal_err, internal_datafusion_err, internal_err, 
project_schema,
+    unwrap_or_internal_err,
 };
 use datafusion_execution::TaskContext;
 use datafusion_execution::disk_manager::RefCountedTempFile;
@@ -713,6 +714,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
             left_stats,
             right_stats,
             &join_columns,
+            NullEquality::NullEqualsNothing,
             &self.join_type,
             &self.join_schema,
         )?;
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
index 9e87b52696..a86cb647e4 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
@@ -582,6 +582,7 @@ impl ExecutionPlan for SortMergeJoinExec {
             left_stats,
             right_stats,
             &self.on,
+            self.null_equality,
             &self.join_type,
             &self.schema,
         )?))
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 5918097194..8cc93dee57 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -417,6 +417,7 @@ impl<T> Clone for OnceFut<T> {
 #[derive(Clone, Debug, Default)]
 struct PartialJoinStatistics {
     pub num_rows: usize,
+    pub total_byte_size: Precision<usize>,
     pub column_statistics: Vec<ColumnStatistics>,
 }
 
@@ -430,9 +431,11 @@ struct PartialJoinStatistics {
 ///   column-level statistics (distinct counts, min/max values) of the join 
keys.
 /// - **Column statistics**: Combines column statistics from both inputs. For 
join types
 ///   that preserve all columns (Inner, Left, Right, Full), statistics from 
both sides
-///   are concatenated. For semi/anti joins, only the relevant side's 
statistics are kept.
-/// - **Byte size**: Always returns `Precision::Absent` as join output size is 
difficult
-///   to estimate without knowing the actual data.
+///   are concatenated. For semi/anti joins, the preserved side's statistics 
are
+///   normalized as subset estimates.
+/// - **Byte size**: For semi/anti joins, sums normalized column byte-size 
estimates
+///   when every output column has one. Other join types return 
`Precision::Absent`
+///   because join output size is difficult to estimate without knowing the 
actual data.
 ///
 /// # The `on` Parameter
 ///
@@ -446,24 +449,34 @@ struct PartialJoinStatistics {
 /// - Does not account for selectivity of arbitrary join filter expressions
 ///   (e.g., `(t1.v1 + t2.v1) % 2 = 0`). Such filters, common in 
NestedLoopJoinExec,
 ///   are not factored into the cardinality estimation.
-/// - Column statistics for the output are simply combined from inputs without
-///   adjusting for join selectivity (acknowledged in the code as needing
-///   "filter selectivity analysis").
+/// - Column statistics for inner/outer joins are simply combined from inputs
+///   without adjusting for join selectivity (acknowledged in the code as
+///   needing "filter selectivity analysis").
 pub(crate) fn estimate_join_statistics(
     left_stats: Statistics,
     right_stats: Statistics,
     on: &JoinOn,
+    null_equality: NullEquality,
     join_type: &JoinType,
     schema: &Schema,
 ) -> Result<Statistics> {
-    let join_stats = estimate_join_cardinality(join_type, left_stats, 
right_stats, on);
-    let (num_rows, column_statistics) = match join_stats {
-        Some(stats) => (Precision::Inexact(stats.num_rows), 
stats.column_statistics),
-        None => (Precision::Absent, Statistics::unknown_column(schema)),
+    let join_stats =
+        estimate_join_cardinality(join_type, left_stats, right_stats, on, 
null_equality);
+    let (num_rows, total_byte_size, column_statistics) = match join_stats {
+        Some(stats) => (
+            Precision::Inexact(stats.num_rows),
+            stats.total_byte_size,
+            stats.column_statistics,
+        ),
+        None => (
+            Precision::Absent,
+            Precision::Absent,
+            Statistics::unknown_column(schema),
+        ),
     };
     Ok(Statistics {
         num_rows,
-        total_byte_size: Precision::Absent,
+        total_byte_size,
         column_statistics,
     })
 }
@@ -474,23 +487,24 @@ fn estimate_join_cardinality(
     left_stats: Statistics,
     right_stats: Statistics,
     on: &JoinOn,
+    null_equality: NullEquality,
 ) -> Option<PartialJoinStatistics> {
-    let (left_key_stats, right_key_stats) = on
+    let on_column_indices = on
         .iter()
-        .map(|(left, right)| {
-            match (
-                left.downcast_ref::<Column>(),
-                right.downcast_ref::<Column>(),
-            ) {
-                (Some(left), Some(right)) => (
-                    left_stats.column_statistics[left.index()].clone(),
-                    right_stats.column_statistics[right.index()].clone(),
-                ),
-                _ => (
-                    ColumnStatistics::new_unknown(),
-                    ColumnStatistics::new_unknown(),
-                ),
-            }
+        .map(|(left, right)| equijoin_column_indices(left, right))
+        .collect::<Vec<_>>();
+
+    let (left_key_stats, right_key_stats) = on_column_indices
+        .iter()
+        .map(|indices| match indices {
+            Some((left_index, right_index)) => (
+                left_stats.column_statistics[*left_index].clone(),
+                right_stats.column_statistics[*right_index].clone(),
+            ),
+            None => (
+                ColumnStatistics::new_unknown(),
+                ColumnStatistics::new_unknown(),
+            ),
         })
         .unzip::<_, _, Vec<_>, Vec<_>>();
 
@@ -526,6 +540,7 @@ fn estimate_join_cardinality(
 
             Some(PartialJoinStatistics {
                 num_rows: *cardinality.get_value()?,
+                total_byte_size: Precision::Absent,
                 // We don't do anything specific here, just combine the 
existing
                 // statistics which might yield subpar results (although it is
                 // true, esp regarding min/max). For a better estimation, we 
need
@@ -547,9 +562,9 @@ fn estimate_join_cardinality(
 
             let (outer_stats, inner_stats, outer_key_stats, inner_key_stats) = 
if is_left
             {
-                (&left_stats, &right_stats, &left_key_stats, &right_key_stats)
+                (left_stats, right_stats, left_key_stats, right_key_stats)
             } else {
-                (&right_stats, &left_stats, &right_key_stats, &left_key_stats)
+                (right_stats, left_stats, right_key_stats, left_key_stats)
             };
 
             let outer_rows = *outer_stats.num_rows.get_value()?;
@@ -575,8 +590,9 @@ fn estimate_join_cardinality(
                     estimate_semi_join_cardinality(
                         &outer_stats.num_rows,
                         &inner_stats.num_rows,
-                        outer_key_stats,
-                        inner_key_stats,
+                        &outer_key_stats,
+                        &inner_key_stats,
+                        null_equality,
                     )
                 };
 
@@ -588,10 +604,37 @@ fn estimate_join_cardinality(
                 (None, _) => outer_rows,
             };
 
-            let outer_stats = if is_left { left_stats } else { right_stats };
+            // The outer side is the one whose columns a semi/anti join emits, 
so
+            // its statistics are the ones to normalize into the subset 
estimate.
+            let Statistics {
+                num_rows: preserved_num_rows,
+                column_statistics: preserved_column_statistics,
+                ..
+            } = outer_stats;
+            let preserved_join_key_indices = on_column_indices
+                .iter()
+                .filter_map(|&indices| {
+                    indices.map(
+                        |(left_index, right_index)| {
+                            if is_left { left_index } else { right_index }
+                        },
+                    )
+                })
+                .collect::<Vec<_>>();
+            let column_statistics = normalize_semi_anti_join_column_statistics(
+                preserved_column_statistics,
+                &preserved_num_rows,
+                cardinality,
+                &preserved_join_key_indices,
+                is_anti,
+                null_equality,
+            );
+            let total_byte_size =
+                total_byte_size_from_column_statistics(&column_statistics);
             Some(PartialJoinStatistics {
                 num_rows: cardinality,
-                column_statistics: outer_stats.column_statistics,
+                total_byte_size,
+                column_statistics,
             })
         }
 
@@ -601,6 +644,7 @@ fn estimate_join_cardinality(
             column_statistics.push(ColumnStatistics::new_unknown());
             Some(PartialJoinStatistics {
                 num_rows,
+                total_byte_size: Precision::Absent,
                 column_statistics,
             })
         }
@@ -610,12 +654,132 @@ fn estimate_join_cardinality(
             column_statistics.push(ColumnStatistics::new_unknown());
             Some(PartialJoinStatistics {
                 num_rows,
+                total_byte_size: Precision::Absent,
                 column_statistics,
             })
         }
     }
 }
 
+fn equijoin_column_indices(
+    left: &PhysicalExprRef,
+    right: &PhysicalExprRef,
+) -> Option<(usize, usize)> {
+    Some((
+        left.downcast_ref::<Column>()?.index(),
+        right.downcast_ref::<Column>()?.index(),
+    ))
+}
+
+/// Adjusts the preserved input's column statistics to describe the subset of
+/// rows a semi or anti join emits. Most values become estimates (marked
+/// inexact) bounded by the smaller output row count:
+///
+/// - `null_count` and `byte_size` are scaled by the output/input row ratio.
+/// - `distinct_count` is capped at the number of non-null output rows.
+/// - `sum_value` is dropped, since the input sum does not apply to the subset.
+///
+/// Join-key columns are the exception for `null_count`: under regular SQL
+/// equality, null keys never match, so a semi join keeps none of those rows 
and
+/// an anti join keeps all of them. Under null-equal joins, null keys can match
+/// and are treated like the rest of the subset.
+fn normalize_semi_anti_join_column_statistics(
+    column_statistics: Vec<ColumnStatistics>,
+    input_num_rows: &Precision<usize>,
+    output_num_rows: usize,
+    join_key_indices: &[usize],
+    is_anti: bool,
+    null_equality: NullEquality,
+) -> Vec<ColumnStatistics> {
+    let input_num_rows = input_num_rows.get_value().copied().unwrap_or(0);
+
+    column_statistics
+        .into_iter()
+        .enumerate()
+        .map(|(idx, stats)| {
+            let mut stats = stats.to_inexact();
+            stats.null_count = if join_key_indices.contains(&idx) {
+                normalize_semi_anti_join_key_null_count(
+                    stats.null_count,
+                    input_num_rows,
+                    output_num_rows,
+                    is_anti,
+                    null_equality,
+                )
+            } else {
+                scale_subset_count(stats.null_count, input_num_rows, 
output_num_rows)
+                    .min(&Precision::Inexact(output_num_rows))
+            };
+            let max_distinct_count = stats
+                .null_count
+                .get_value()
+                .map(|null_count| output_num_rows.saturating_sub(*null_count))
+                .unwrap_or(output_num_rows);
+            stats.distinct_count = stats
+                .distinct_count
+                .min(&Precision::Inexact(max_distinct_count));
+            stats.byte_size =
+                scale_subset_count(stats.byte_size, input_num_rows, 
output_num_rows);
+            stats.sum_value = Precision::Absent;
+            stats
+        })
+        .collect()
+}
+
+fn normalize_semi_anti_join_key_null_count(
+    null_count: Precision<usize>,
+    input_num_rows: usize,
+    output_num_rows: usize,
+    is_anti: bool,
+    null_equality: NullEquality,
+) -> Precision<usize> {
+    match (is_anti, null_equality) {
+        (false, NullEquality::NullEqualsNothing) => Precision::Exact(0),
+        (true, NullEquality::NullEqualsNothing) => null_count
+            .to_inexact()
+            .min(&Precision::Inexact(output_num_rows)),
+        (_, NullEquality::NullEqualsNull) => {
+            scale_subset_count(null_count, input_num_rows, output_num_rows)
+                .min(&Precision::Inexact(output_num_rows))
+        }
+    }
+}
+
+// Scale a column-level count to an estimated row subset. Rounding up keeps a
+// small non-zero count from disappearing solely because the subset is small.
+fn scale_subset_count(
+    count: Precision<usize>,
+    input_num_rows: usize,
+    output_num_rows: usize,
+) -> Precision<usize> {
+    let scaled = match count {
+        Precision::Exact(count) | Precision::Inexact(count) => {
+            if input_num_rows == 0 {
+                0
+            } else {
+                (count as u128 * output_num_rows as 
u128).div_ceil(input_num_rows as u128)
+                    as usize
+            }
+        }
+        Precision::Absent => return Precision::Absent,
+    };
+
+    Precision::Inexact(scaled)
+}
+
+fn total_byte_size_from_column_statistics(
+    column_statistics: &[ColumnStatistics],
+) -> Precision<usize> {
+    column_statistics
+        .iter()
+        .map(|stats| stats.byte_size.get_value().copied())
+        .try_fold(0usize, |acc, byte_size| {
+            byte_size.map(|byte_size| acc.saturating_add(byte_size))
+        })
+        .map(Precision::Inexact)
+        .unwrap_or(Precision::Absent)
+}
+
 /// Estimate the inner join cardinality by using the basic building blocks of
 /// column-level statistics and the total row count. This is a very naive and
 /// a very conservative implementation that can quickly give up if there is not
@@ -640,6 +804,13 @@ fn estimate_inner_join_cardinality(
         ..
     } = right_stats;
 
+    if left_num_rows == Precision::Exact(0) || right_num_rows == 
Precision::Exact(0) {
+        return Some(Precision::Exact(0));
+    }
+    if left_num_rows == Precision::Inexact(0) || right_num_rows == 
Precision::Inexact(0) {
+        return Some(Precision::Inexact(0));
+    }
+
     // Follow Spark Catalyst's conservative NDV join estimate: for multi-key
     // joins, use the most selective key instead of multiplying all key 
denominators.
     let mut join_selectivity = Precision::Absent;
@@ -743,8 +914,8 @@ fn estimate_disjoint_inputs(
 /// Under the uniformity assumption (each distinct value contributes
 /// equally to row counts), the surviving fraction of outer rows is:
 ///
-/// Null rows cannot match, so each column's selectivity is further
-/// reduced by the outer null fraction:
+/// Under regular SQL equality, null rows cannot match, so each column's
+/// selectivity is further reduced by the outer null fraction:
 ///
 /// ```text
 /// null_frac_i = outer_null_count_i / outer_rows
@@ -761,7 +932,7 @@ fn estimate_disjoint_inputs(
 /// Anti join cardinality is derived as the complement:
 /// `outer_rows - semi_cardinality`.
 ///
-/// Boundary cases:
+/// With `NullEqualsNothing`, boundary cases are:
 /// * `inner_ndv >= outer_ndv` → selectivity = `1.0 - null_frac`
 /// * `null_frac = 1.0` → selectivity = 0.0 (no non-null rows can match)
 /// * Missing NDV statistics → returns `None` (fallback to `outer_rows`)
@@ -776,6 +947,7 @@ fn estimate_semi_join_cardinality(
     inner_num_rows: &Precision<usize>,
     outer_key_stats: &[ColumnStatistics],
     inner_key_stats: &[ColumnStatistics],
+    null_equality: NullEquality,
 ) -> Option<usize> {
     let outer_rows = *outer_num_rows.get_value()?;
     if outer_rows == 0 {
@@ -806,11 +978,21 @@ fn estimate_semi_join_cardinality(
         if let (Some(&o), Some(&i)) = (outer_ndv.get_value(), 
inner_ndv.get_value())
             && o > 0
         {
-            let null_frac = outer_stat
-                .null_count
-                .get_value()
-                .map(|&nc| nc as f64 / outer_rows as f64)
-                .unwrap_or(0.0);
+            let null_frac = if null_equality == 
NullEquality::NullEqualsNothing {
+                outer_stat
+                    .null_count
+                    .get_value()
+                    .map(|&nc| {
+                        if nc > outer_rows {
+                            0.0
+                        } else {
+                            nc as f64 / outer_rows as f64
+                        }
+                    })
+                    .unwrap_or(0.0)
+            } else {
+                0.0
+            };
             selectivity *= (o.min(i) as f64) / (o as f64) * (1.0 - null_frac);
             has_selectivity_estimate = true;
         }
@@ -2590,6 +2772,7 @@ mod tests {
                 create_stats(Some(left_num_rows), left_col_stats.clone(), 
false),
                 create_stats(Some(right_num_rows), right_col_stats.clone(), 
false),
                 &join_on,
+                NullEquality::NullEqualsNothing,
             );
 
             assert_eq!(
@@ -2722,6 +2905,7 @@ mod tests {
                 create_stats(Some(1000), left_col_stats.clone(), false),
                 create_stats(Some(2000), right_col_stats.clone(), false),
                 &join_on,
+                NullEquality::NullEqualsNothing,
             )
             .unwrap();
             assert_eq!(partial_join_stats.num_rows, expected_num_rows);
@@ -2775,6 +2959,7 @@ mod tests {
             create_stats(Some(1000), left_col_stats.clone(), false),
             create_stats(Some(2000), right_col_stats.clone(), false),
             &join_on_ab,
+            NullEquality::NullEqualsNothing,
         )
         .unwrap();
         let stats_ba = estimate_join_cardinality(
@@ -2782,6 +2967,7 @@ mod tests {
             create_stats(Some(1000), left_col_stats.clone(), false),
             create_stats(Some(2000), right_col_stats.clone(), false),
             &join_on_ba,
+            NullEquality::NullEqualsNothing,
         )
         .unwrap();
 
@@ -2855,6 +3041,7 @@ mod tests {
                 create_stats(Some(1000), left_col_stats.clone(), true),
                 create_stats(Some(2000), right_col_stats.clone(), true),
                 &join_on,
+                NullEquality::NullEqualsNothing,
             )
             .unwrap();
             assert_eq!(partial_join_stats.num_rows, expected_num_rows);
@@ -3090,6 +3277,7 @@ mod tests {
                     column_statistics: inner_col_stats,
                 },
                 &join_on,
+                NullEquality::NullEqualsNothing,
             )
             .map(|cardinality| cardinality.num_rows);
 
@@ -3124,6 +3312,7 @@ mod tests {
                 column_statistics: dummy_column_stats.clone(),
             },
             &join_on,
+            NullEquality::NullEqualsNothing,
         );
         assert!(
             absent_outer_estimation.is_none(),
@@ -3143,6 +3332,7 @@ mod tests {
                 column_statistics: dummy_column_stats.clone(),
             },
             &join_on,
+            NullEquality::NullEqualsNothing,
         ).expect("Expected non-empty PartialJoinStatistics for SemiJoin with 
absent inner num_rows");
 
         assert_eq!(
@@ -3163,6 +3353,7 @@ mod tests {
                 column_statistics: dummy_column_stats,
             },
             &join_on,
+            NullEquality::NullEqualsNothing,
         );
         assert!(
             absent_inner_estimation.is_none(),
@@ -3209,6 +3400,7 @@ mod tests {
                 ],
             },
             &join_on,
+            NullEquality::NullEqualsNothing,
         )
         .map(|c| c.num_rows);
         assert_eq!(result, Some(13), "multi-column semi join");
@@ -3233,6 +3425,7 @@ mod tests {
                 ],
             },
             &join_on,
+            NullEquality::NullEqualsNothing,
         )
         .map(|c| c.num_rows);
         assert_eq!(result, Some(87), "multi-column anti join");
@@ -3260,6 +3453,7 @@ mod tests {
                 ],
             },
             &join_on,
+            NullEquality::NullEqualsNothing,
         )
         .map(|c| c.num_rows);
         assert_eq!(result, Some(50), "mixed stats: col1 skipped");
@@ -3284,6 +3478,7 @@ mod tests {
                 ],
             },
             &join_on,
+            NullEquality::NullEqualsNothing,
         )
         .map(|c| c.num_rows);
         assert_eq!(result, Some(100), "no column has stats on both sides");
@@ -3312,6 +3507,7 @@ mod tests {
                 ],
             },
             &join_on,
+            NullEquality::NullEqualsNothing,
         )
         .map(|c| c.num_rows);
         assert_eq!(
@@ -3353,6 +3549,7 @@ mod tests {
             left_stats.clone(),
             right_stats.clone(),
             &join_on,
+            NullEquality::NullEqualsNothing,
         )
         .map(|c| c.num_rows);
         assert_eq!(left_semi, Some(50));
@@ -3362,11 +3559,305 @@ mod tests {
             left_stats,
             right_stats,
             &join_on,
+            NullEquality::NullEqualsNothing,
         )
         .map(|c| c.num_rows);
         assert_eq!(left_anti, Some(0));
     }
 
+    #[test]
+    fn test_semi_join_scales_preserved_column_statistics() {
+        let join_on = vec![(
+            Arc::new(Column::new("l_key", 0)) as _,
+            Arc::new(Column::new("r_key", 0)) as _,
+        )];
+
+        let result = estimate_join_cardinality(
+            &JoinType::LeftSemi,
+            Statistics {
+                num_rows: Inexact(432_187),
+                total_byte_size: Absent,
+                column_statistics: vec![
+                    ColumnStatistics {
+                        null_count: Exact(7_196),
+                        min_value: Exact(ScalarValue::from(1_i64)),
+                        max_value: Exact(ScalarValue::from(432_187_i64)),
+                        sum_value: Absent,
+                        distinct_count: Absent,
+                        byte_size: Exact(3_457_496),
+                    },
+                    ColumnStatistics {
+                        null_count: Exact(7_196),
+                        min_value: Exact(ScalarValue::from(1_i64)),
+                        max_value: Exact(ScalarValue::from(432_187_i64)),
+                        sum_value: Exact(ScalarValue::from(1_000_000_i64)),
+                        distinct_count: Exact(500_000),
+                        byte_size: Exact(3_457_496),
+                    },
+                ],
+            },
+            Statistics {
+                num_rows: Inexact(32),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Inexact(1),
+                    Inexact(32),
+                    Absent,
+                    Absent,
+                )],
+            },
+            &join_on,
+            NullEquality::NullEqualsNothing,
+        )
+        .expect("semi join cardinality should be estimated");
+
+        assert_eq!(result.num_rows, 32);
+        assert_eq!(result.total_byte_size, Inexact(512));
+        assert_eq!(result.column_statistics[0].null_count, Exact(0));
+        assert_eq!(result.column_statistics[0].distinct_count, Absent);
+        assert_eq!(
+            result.column_statistics[0].min_value,
+            Inexact(ScalarValue::from(1_i64))
+        );
+        assert_eq!(
+            result.column_statistics[0].max_value,
+            Inexact(ScalarValue::from(432_187_i64))
+        );
+        assert_eq!(result.column_statistics[0].byte_size, Inexact(256));
+        assert_eq!(result.column_statistics[1].null_count, Inexact(1));
+        // distinct_count is capped at the non-null output rows (32 - 1).
+        assert_eq!(result.column_statistics[1].distinct_count, Inexact(31));
+        assert_eq!(result.column_statistics[1].sum_value, Absent);
+        assert_eq!(result.column_statistics[1].byte_size, Inexact(256));
+    }
+
+    #[test]
+    fn test_semi_join_null_equals_null_scales_join_key_nulls() {
+        let join_on = vec![(
+            Arc::new(Column::new("l_key", 0)) as _,
+            Arc::new(Column::new("r_key", 0)) as _,
+        )];
+
+        let result = estimate_join_cardinality(
+            &JoinType::LeftSemi,
+            Statistics {
+                num_rows: Inexact(100),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Absent,
+                    Absent,
+                    Inexact(100),
+                    Exact(20),
+                )],
+            },
+            Statistics {
+                num_rows: Inexact(10),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Absent,
+                    Absent,
+                    Inexact(10),
+                    Absent,
+                )],
+            },
+            &join_on,
+            NullEquality::NullEqualsNull,
+        )
+        .expect("semi join cardinality should be estimated");
+
+        assert_eq!(result.num_rows, 10);
+        assert_eq!(result.column_statistics[0].null_count, Inexact(2));
+        assert_eq!(result.column_statistics[0].distinct_count, Inexact(8));
+    }
+
+    #[test]
+    fn test_semi_join_total_byte_size_absent_if_any_column_byte_size_absent() {
+        let join_on = vec![(
+            Arc::new(Column::new("l_key", 0)) as _,
+            Arc::new(Column::new("r_key", 0)) as _,
+        )];
+
+        let result = estimate_join_cardinality(
+            &JoinType::LeftSemi,
+            Statistics {
+                num_rows: Inexact(100),
+                total_byte_size: Absent,
+                column_statistics: vec![
+                    ColumnStatistics {
+                        null_count: Exact(0),
+                        min_value: Exact(ScalarValue::from(1_i64)),
+                        max_value: Exact(ScalarValue::from(100_i64)),
+                        sum_value: Absent,
+                        distinct_count: Absent,
+                        byte_size: Exact(800),
+                    },
+                    ColumnStatistics {
+                        null_count: Exact(0),
+                        min_value: Absent,
+                        max_value: Absent,
+                        sum_value: Absent,
+                        distinct_count: Absent,
+                        byte_size: Absent,
+                    },
+                ],
+            },
+            Statistics {
+                num_rows: Inexact(10),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Inexact(1),
+                    Inexact(10),
+                    Absent,
+                    Absent,
+                )],
+            },
+            &join_on,
+            NullEquality::NullEqualsNothing,
+        )
+        .expect("semi join cardinality should be estimated");
+
+        assert_eq!(result.num_rows, 10);
+        assert_eq!(result.total_byte_size, Absent);
+    }
+
+    #[test]
+    fn test_anti_join_preserves_join_key_nulls() {
+        let join_on = vec![(
+            Arc::new(Column::new("l_key", 0)) as _,
+            Arc::new(Column::new("r_key", 0)) as _,
+        )];
+
+        let result = estimate_join_cardinality(
+            &JoinType::LeftAnti,
+            Statistics {
+                num_rows: Inexact(1_000_000),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Absent,
+                    Absent,
+                    Inexact(900_000),
+                    Exact(100_000),
+                )],
+            },
+            Statistics {
+                num_rows: Inexact(900_000),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Absent,
+                    Absent,
+                    Inexact(900_000),
+                    Absent,
+                )],
+            },
+            &join_on,
+            NullEquality::NullEqualsNothing,
+        )
+        .expect("anti join cardinality should be estimated");
+
+        assert_eq!(result.num_rows, 100_000);
+        assert_eq!(result.column_statistics[0].null_count, Inexact(100_000));
+        assert_eq!(result.column_statistics[0].distinct_count, Inexact(0));
+    }
+
+    #[test]
+    fn test_anti_join_null_equals_null_scales_join_key_nulls() {
+        let join_on = vec![(
+            Arc::new(Column::new("l_key", 0)) as _,
+            Arc::new(Column::new("r_key", 0)) as _,
+        )];
+
+        let result = estimate_join_cardinality(
+            &JoinType::LeftAnti,
+            Statistics {
+                num_rows: Inexact(100),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Absent,
+                    Absent,
+                    Inexact(100),
+                    Exact(20),
+                )],
+            },
+            Statistics {
+                num_rows: Inexact(10),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Absent,
+                    Absent,
+                    Inexact(10),
+                    Absent,
+                )],
+            },
+            &join_on,
+            NullEquality::NullEqualsNull,
+        )
+        .expect("anti join cardinality should be estimated");
+
+        assert_eq!(result.num_rows, 90);
+        assert_eq!(result.column_statistics[0].null_count, Inexact(18));
+        assert_eq!(result.column_statistics[0].distinct_count, Inexact(72));
+    }
+
+    #[test]
+    fn test_right_semi_join_scales_preserved_column_statistics() {
+        let join_on = vec![(
+            Arc::new(Column::new("l_key", 0)) as _,
+            Arc::new(Column::new("r_key", 0)) as _,
+        )];
+
+        // For a right semi join the right input is preserved, so its column
+        // statistics (and right join-key index) are the ones normalized.
+        let result = estimate_join_cardinality(
+            &JoinType::RightSemi,
+            Statistics {
+                num_rows: Inexact(32),
+                total_byte_size: Absent,
+                column_statistics: vec![create_column_stats(
+                    Inexact(1),
+                    Inexact(32),
+                    Absent,
+                    Absent,
+                )],
+            },
+            Statistics {
+                num_rows: Inexact(432_187),
+                total_byte_size: Absent,
+                column_statistics: vec![
+                    ColumnStatistics {
+                        null_count: Exact(7_196),
+                        min_value: Exact(ScalarValue::from(1_i64)),
+                        max_value: Exact(ScalarValue::from(432_187_i64)),
+                        sum_value: Absent,
+                        distinct_count: Absent,
+                        byte_size: Exact(3_457_496),
+                    },
+                    ColumnStatistics {
+                        null_count: Exact(7_196),
+                        min_value: Exact(ScalarValue::from(1_i64)),
+                        max_value: Exact(ScalarValue::from(432_187_i64)),
+                        sum_value: Exact(ScalarValue::from(1_000_000_i64)),
+                        distinct_count: Exact(500_000),
+                        byte_size: Exact(3_457_496),
+                    },
+                ],
+            },
+            &join_on,
+            NullEquality::NullEqualsNothing,
+        )
+        .expect("right semi join cardinality should be estimated");
+
+        assert_eq!(result.num_rows, 32);
+        // Join-key column: null counts collapse to exact zero (null keys 
never match).
+        assert_eq!(result.column_statistics[0].null_count, Exact(0));
+        assert_eq!(result.column_statistics[0].byte_size, Inexact(256));
+        // Non-key column: counts scaled to the subset, sum dropped, distinct
+        // capped at the non-null output rows (32 - 1).
+        assert_eq!(result.column_statistics[1].null_count, Inexact(1));
+        assert_eq!(result.column_statistics[1].distinct_count, Inexact(31));
+        assert_eq!(result.column_statistics[1].sum_value, Absent);
+        assert_eq!(result.column_statistics[1].byte_size, Inexact(256));
+    }
+
     #[test]
     fn test_calculate_join_output_ordering() -> Result<()> {
         let left_ordering = LexOrdering::new(vec![


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to