This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3c4034c8d0 fix: Scale semi/anti-join column stats by estimated row
count (#22762)
3c4034c8d0 is described below
commit 3c4034c8d0685bb70e8e94f6a8952d309e3b9c41
Author: Neil Conway <[email protected]>
AuthorDate: Sat Jun 6 01:58:53 2026 -0400
fix: Scale semi/anti-join column stats by estimated row count (#22762)
## Which issue does this PR close?
- Closes #22743
## Rationale for this change
This PR makes several related improvements/fixes to the stats code for
semi- and anti-joins:
1. Scale per-column stats using the estimated output row count, rather
than just reusing the stats from the preserved side of the join.
2. Compute `total_byte_size` for semi/anti-join results, based on
summing per-column `byte_size`, instead of always emitting `Absent`. We
still emit absent for other join types and if any of the per-column
`byte_size` values are `Absent`
3. Pass in the join's `NullEquality` semantics, and use those for stats:
under `NullEqualsNothing`, null join keys will never match (so we can
return `Exact(0)`), whereas under `NullEqualsNull` we consider nulls
just like any other value.
## What changes are included in this PR?
* Stats improvements described above
* Some refactoring and cleanup
* New unit tests
* Update test expectations where needed
## Are these changes tested?
Yes; new tests added.
## Are there any user-facing changes?
Some queries might get different plans.
---
.../physical_optimizer/partition_statistics.rs | 21 +-
.../physical-plan/src/joins/hash_join/exec.rs | 3 +
.../physical-plan/src/joins/nested_loop_join.rs | 6 +-
.../src/joins/sort_merge_join/exec.rs | 1 +
datafusion/physical-plan/src/joins/utils.rs | 571 +++++++++++++++++++--
5 files changed, 548 insertions(+), 54 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index 181b7de7d9..4fba94ec3a 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -661,39 +661,36 @@ mod test {
let full_statistics = nested_loop_join.partition_statistics(None)?;
// With empty join columns, estimate_join_statistics returns Inexact
row count
// based on the outer side (right side for RightSemi)
- let mut expected_full_statistics = create_partition_statistics(
+ let expected_full_statistics = create_partition_statistics(
4,
32,
1,
4,
Some((DATE_2025_03_01, DATE_2025_03_04)),
- );
- expected_full_statistics.num_rows = Precision::Inexact(4);
- expected_full_statistics.total_byte_size = Precision::Absent;
+ )
+ .to_inexact();
assert_eq!(*full_statistics, expected_full_statistics);
// Test partition_statistics(Some(idx)) - returns partition-specific
statistics
// Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
- let mut expected_statistic_partition_1 = create_partition_statistics(
+ let expected_statistic_partition_1 = create_partition_statistics(
2,
16,
3,
4,
Some((DATE_2025_03_01, DATE_2025_03_02)),
- );
- expected_statistic_partition_1.num_rows = Precision::Inexact(2);
- expected_statistic_partition_1.total_byte_size = Precision::Absent;
+ )
+ .to_inexact();
// Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
- let mut expected_statistic_partition_2 = create_partition_statistics(
+ let expected_statistic_partition_2 = create_partition_statistics(
2,
16,
1,
2,
Some((DATE_2025_03_03, DATE_2025_03_04)),
- );
- expected_statistic_partition_2.num_rows = Precision::Inexact(2);
- expected_statistic_partition_2.total_byte_size = Precision::Absent;
+ )
+ .to_inexact();
let statistics =
(0..nested_loop_join.output_partitioning().partition_count())
.map(|idx| nested_loop_join.partition_statistics(Some(idx)))
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 03387c316b..3774a30020 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -1448,6 +1448,7 @@ impl ExecutionPlan for HashJoinExec {
Arc::unwrap_or_clone(left_stats),
Arc::unwrap_or_clone(right_stats),
&self.on,
+ self.null_equality,
&self.join_type,
&self.join_schema,
)?
@@ -1463,6 +1464,7 @@ impl ExecutionPlan for HashJoinExec {
Arc::unwrap_or_clone(left_stats),
Arc::unwrap_or_clone(right_stats),
&self.on,
+ self.null_equality,
&self.join_type,
&self.join_schema,
)?
@@ -1480,6 +1482,7 @@ impl ExecutionPlan for HashJoinExec {
Arc::unwrap_or_clone(left_stats),
Arc::unwrap_or_clone(right_stats),
&self.on,
+ self.null_equality,
&self.join_type,
&self.join_schema,
)?
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 15af23b447..a18ec0cbe4 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -61,8 +61,9 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::DataType;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::{
- JoinSide, Result, ScalarValue, Statistics, arrow_err,
assert_eq_or_internal_err,
- internal_datafusion_err, internal_err, project_schema,
unwrap_or_internal_err,
+ JoinSide, NullEquality, Result, ScalarValue, Statistics, arrow_err,
+ assert_eq_or_internal_err, internal_datafusion_err, internal_err,
project_schema,
+ unwrap_or_internal_err,
};
use datafusion_execution::TaskContext;
use datafusion_execution::disk_manager::RefCountedTempFile;
@@ -713,6 +714,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
left_stats,
right_stats,
&join_columns,
+ NullEquality::NullEqualsNothing,
&self.join_type,
&self.join_schema,
)?;
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
index 9e87b52696..a86cb647e4 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
@@ -582,6 +582,7 @@ impl ExecutionPlan for SortMergeJoinExec {
left_stats,
right_stats,
&self.on,
+ self.null_equality,
&self.join_type,
&self.schema,
)?))
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index 5918097194..8cc93dee57 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -417,6 +417,7 @@ impl<T> Clone for OnceFut<T> {
#[derive(Clone, Debug, Default)]
struct PartialJoinStatistics {
pub num_rows: usize,
+ pub total_byte_size: Precision<usize>,
pub column_statistics: Vec<ColumnStatistics>,
}
@@ -430,9 +431,11 @@ struct PartialJoinStatistics {
/// column-level statistics (distinct counts, min/max values) of the join
keys.
/// - **Column statistics**: Combines column statistics from both inputs. For
join types
/// that preserve all columns (Inner, Left, Right, Full), statistics from
both sides
-/// are concatenated. For semi/anti joins, only the relevant side's
statistics are kept.
-/// - **Byte size**: Always returns `Precision::Absent` as join output size is
difficult
-/// to estimate without knowing the actual data.
+/// are concatenated. For semi/anti joins, the preserved side's statistics
are
+/// normalized as subset estimates.
+/// - **Byte size**: For semi/anti joins, sums normalized column byte-size
estimates
+/// when every output column has one. Other join types return
`Precision::Absent`
+/// because join output size is difficult to estimate without knowing the
actual data.
///
/// # The `on` Parameter
///
@@ -446,24 +449,34 @@ struct PartialJoinStatistics {
/// - Does not account for selectivity of arbitrary join filter expressions
/// (e.g., `(t1.v1 + t2.v1) % 2 = 0`). Such filters, common in
NestedLoopJoinExec,
/// are not factored into the cardinality estimation.
-/// - Column statistics for the output are simply combined from inputs without
-/// adjusting for join selectivity (acknowledged in the code as needing
-/// "filter selectivity analysis").
+/// - Column statistics for inner/outer joins are simply combined from inputs
+/// without adjusting for join selectivity (acknowledged in the code as
+/// needing "filter selectivity analysis").
pub(crate) fn estimate_join_statistics(
left_stats: Statistics,
right_stats: Statistics,
on: &JoinOn,
+ null_equality: NullEquality,
join_type: &JoinType,
schema: &Schema,
) -> Result<Statistics> {
- let join_stats = estimate_join_cardinality(join_type, left_stats,
right_stats, on);
- let (num_rows, column_statistics) = match join_stats {
- Some(stats) => (Precision::Inexact(stats.num_rows),
stats.column_statistics),
- None => (Precision::Absent, Statistics::unknown_column(schema)),
+ let join_stats =
+ estimate_join_cardinality(join_type, left_stats, right_stats, on,
null_equality);
+ let (num_rows, total_byte_size, column_statistics) = match join_stats {
+ Some(stats) => (
+ Precision::Inexact(stats.num_rows),
+ stats.total_byte_size,
+ stats.column_statistics,
+ ),
+ None => (
+ Precision::Absent,
+ Precision::Absent,
+ Statistics::unknown_column(schema),
+ ),
};
Ok(Statistics {
num_rows,
- total_byte_size: Precision::Absent,
+ total_byte_size,
column_statistics,
})
}
@@ -474,23 +487,24 @@ fn estimate_join_cardinality(
left_stats: Statistics,
right_stats: Statistics,
on: &JoinOn,
+ null_equality: NullEquality,
) -> Option<PartialJoinStatistics> {
- let (left_key_stats, right_key_stats) = on
+ let on_column_indices = on
.iter()
- .map(|(left, right)| {
- match (
- left.downcast_ref::<Column>(),
- right.downcast_ref::<Column>(),
- ) {
- (Some(left), Some(right)) => (
- left_stats.column_statistics[left.index()].clone(),
- right_stats.column_statistics[right.index()].clone(),
- ),
- _ => (
- ColumnStatistics::new_unknown(),
- ColumnStatistics::new_unknown(),
- ),
- }
+ .map(|(left, right)| equijoin_column_indices(left, right))
+ .collect::<Vec<_>>();
+
+ let (left_key_stats, right_key_stats) = on_column_indices
+ .iter()
+ .map(|indices| match indices {
+ Some((left_index, right_index)) => (
+ left_stats.column_statistics[*left_index].clone(),
+ right_stats.column_statistics[*right_index].clone(),
+ ),
+ None => (
+ ColumnStatistics::new_unknown(),
+ ColumnStatistics::new_unknown(),
+ ),
})
.unzip::<_, _, Vec<_>, Vec<_>>();
@@ -526,6 +540,7 @@ fn estimate_join_cardinality(
Some(PartialJoinStatistics {
num_rows: *cardinality.get_value()?,
+ total_byte_size: Precision::Absent,
// We don't do anything specific here, just combine the
existing
// statistics which might yield subpar results (although it is
// true, esp regarding min/max). For a better estimation, we
need
@@ -547,9 +562,9 @@ fn estimate_join_cardinality(
let (outer_stats, inner_stats, outer_key_stats, inner_key_stats) =
if is_left
{
- (&left_stats, &right_stats, &left_key_stats, &right_key_stats)
+ (left_stats, right_stats, left_key_stats, right_key_stats)
} else {
- (&right_stats, &left_stats, &right_key_stats, &left_key_stats)
+ (right_stats, left_stats, right_key_stats, left_key_stats)
};
let outer_rows = *outer_stats.num_rows.get_value()?;
@@ -575,8 +590,9 @@ fn estimate_join_cardinality(
estimate_semi_join_cardinality(
&outer_stats.num_rows,
&inner_stats.num_rows,
- outer_key_stats,
- inner_key_stats,
+ &outer_key_stats,
+ &inner_key_stats,
+ null_equality,
)
};
@@ -588,10 +604,37 @@ fn estimate_join_cardinality(
(None, _) => outer_rows,
};
- let outer_stats = if is_left { left_stats } else { right_stats };
+ // The outer side is the one whose columns a semi/anti join emits,
so
+ // its statistics are the ones to normalize into the subset
estimate.
+ let Statistics {
+ num_rows: preserved_num_rows,
+ column_statistics: preserved_column_statistics,
+ ..
+ } = outer_stats;
+ let preserved_join_key_indices = on_column_indices
+ .iter()
+ .filter_map(|&indices| {
+ indices.map(
+ |(left_index, right_index)| {
+ if is_left { left_index } else { right_index }
+ },
+ )
+ })
+ .collect::<Vec<_>>();
+ let column_statistics = normalize_semi_anti_join_column_statistics(
+ preserved_column_statistics,
+ &preserved_num_rows,
+ cardinality,
+ &preserved_join_key_indices,
+ is_anti,
+ null_equality,
+ );
+ let total_byte_size =
+ total_byte_size_from_column_statistics(&column_statistics);
Some(PartialJoinStatistics {
num_rows: cardinality,
- column_statistics: outer_stats.column_statistics,
+ total_byte_size,
+ column_statistics,
})
}
@@ -601,6 +644,7 @@ fn estimate_join_cardinality(
column_statistics.push(ColumnStatistics::new_unknown());
Some(PartialJoinStatistics {
num_rows,
+ total_byte_size: Precision::Absent,
column_statistics,
})
}
@@ -610,12 +654,132 @@ fn estimate_join_cardinality(
column_statistics.push(ColumnStatistics::new_unknown());
Some(PartialJoinStatistics {
num_rows,
+ total_byte_size: Precision::Absent,
column_statistics,
})
}
}
}
+fn equijoin_column_indices(
+ left: &PhysicalExprRef,
+ right: &PhysicalExprRef,
+) -> Option<(usize, usize)> {
+ Some((
+ left.downcast_ref::<Column>()?.index(),
+ right.downcast_ref::<Column>()?.index(),
+ ))
+}
+
+/// Adjusts the preserved input's column statistics to describe the subset of
+/// rows a semi or anti join emits. Most values become estimates (marked
+/// inexact) bounded by the smaller output row count:
+///
+/// - `null_count` and `byte_size` are scaled by the output/input row ratio.
+/// - `distinct_count` is capped at the number of non-null output rows.
+/// - `sum_value` is dropped, since the input sum does not apply to the subset.
+///
+/// Join-key columns are the exception for `null_count`: under regular SQL
+/// equality, null keys never match, so a semi join keeps none of those rows
and
+/// an anti join keeps all of them. Under null-equal joins, null keys can match
+/// and are treated like the rest of the subset.
+fn normalize_semi_anti_join_column_statistics(
+ column_statistics: Vec<ColumnStatistics>,
+ input_num_rows: &Precision<usize>,
+ output_num_rows: usize,
+ join_key_indices: &[usize],
+ is_anti: bool,
+ null_equality: NullEquality,
+) -> Vec<ColumnStatistics> {
+ let input_num_rows = input_num_rows.get_value().copied().unwrap_or(0);
+
+ column_statistics
+ .into_iter()
+ .enumerate()
+ .map(|(idx, stats)| {
+ let mut stats = stats.to_inexact();
+ stats.null_count = if join_key_indices.contains(&idx) {
+ normalize_semi_anti_join_key_null_count(
+ stats.null_count,
+ input_num_rows,
+ output_num_rows,
+ is_anti,
+ null_equality,
+ )
+ } else {
+ scale_subset_count(stats.null_count, input_num_rows,
output_num_rows)
+ .min(&Precision::Inexact(output_num_rows))
+ };
+ let max_distinct_count = stats
+ .null_count
+ .get_value()
+ .map(|null_count| output_num_rows.saturating_sub(*null_count))
+ .unwrap_or(output_num_rows);
+ stats.distinct_count = stats
+ .distinct_count
+ .min(&Precision::Inexact(max_distinct_count));
+ stats.byte_size =
+ scale_subset_count(stats.byte_size, input_num_rows,
output_num_rows);
+ stats.sum_value = Precision::Absent;
+ stats
+ })
+ .collect()
+}
+
+fn normalize_semi_anti_join_key_null_count(
+ null_count: Precision<usize>,
+ input_num_rows: usize,
+ output_num_rows: usize,
+ is_anti: bool,
+ null_equality: NullEquality,
+) -> Precision<usize> {
+ match (is_anti, null_equality) {
+ (false, NullEquality::NullEqualsNothing) => Precision::Exact(0),
+ (true, NullEquality::NullEqualsNothing) => null_count
+ .to_inexact()
+ .min(&Precision::Inexact(output_num_rows)),
+ (_, NullEquality::NullEqualsNull) => {
+ scale_subset_count(null_count, input_num_rows, output_num_rows)
+ .min(&Precision::Inexact(output_num_rows))
+ }
+ }
+}
+
+// Scale a column-level count to an estimated row subset. Rounding up keeps a
+// small non-zero count from disappearing solely because the subset is small.
+fn scale_subset_count(
+ count: Precision<usize>,
+ input_num_rows: usize,
+ output_num_rows: usize,
+) -> Precision<usize> {
+ let scaled = match count {
+ Precision::Exact(count) | Precision::Inexact(count) => {
+ if input_num_rows == 0 {
+ 0
+ } else {
+ (count as u128 * output_num_rows as
u128).div_ceil(input_num_rows as u128)
+ as usize
+ }
+ }
+ Precision::Absent => return Precision::Absent,
+ };
+
+ Precision::Inexact(scaled)
+}
+
+fn total_byte_size_from_column_statistics(
+ column_statistics: &[ColumnStatistics],
+) -> Precision<usize> {
+ column_statistics
+ .iter()
+ .map(|stats| stats.byte_size.get_value().copied())
+ .try_fold(0usize, |acc, byte_size| {
+ byte_size.map(|byte_size| acc.saturating_add(byte_size))
+ })
+ .map(Precision::Inexact)
+ .unwrap_or(Precision::Absent)
+}
+
/// Estimate the inner join cardinality by using the basic building blocks of
/// column-level statistics and the total row count. This is a very naive and
/// a very conservative implementation that can quickly give up if there is not
@@ -640,6 +804,13 @@ fn estimate_inner_join_cardinality(
..
} = right_stats;
+ if left_num_rows == Precision::Exact(0) || right_num_rows ==
Precision::Exact(0) {
+ return Some(Precision::Exact(0));
+ }
+ if left_num_rows == Precision::Inexact(0) || right_num_rows ==
Precision::Inexact(0) {
+ return Some(Precision::Inexact(0));
+ }
+
// Follow Spark Catalyst's conservative NDV join estimate: for multi-key
// joins, use the most selective key instead of multiplying all key
denominators.
let mut join_selectivity = Precision::Absent;
@@ -743,8 +914,8 @@ fn estimate_disjoint_inputs(
/// Under the uniformity assumption (each distinct value contributes
/// equally to row counts), the surviving fraction of outer rows is:
///
-/// Null rows cannot match, so each column's selectivity is further
-/// reduced by the outer null fraction:
+/// Under regular SQL equality, null rows cannot match, so each column's
+/// selectivity is further reduced by the outer null fraction:
///
/// ```text
/// null_frac_i = outer_null_count_i / outer_rows
@@ -761,7 +932,7 @@ fn estimate_disjoint_inputs(
/// Anti join cardinality is derived as the complement:
/// `outer_rows - semi_cardinality`.
///
-/// Boundary cases:
+/// With `NullEqualsNothing`, boundary cases are:
/// * `inner_ndv >= outer_ndv` → selectivity = `1.0 - null_frac`
/// * `null_frac = 1.0` → selectivity = 0.0 (no non-null rows can match)
/// * Missing NDV statistics → returns `None` (fallback to `outer_rows`)
@@ -776,6 +947,7 @@ fn estimate_semi_join_cardinality(
inner_num_rows: &Precision<usize>,
outer_key_stats: &[ColumnStatistics],
inner_key_stats: &[ColumnStatistics],
+ null_equality: NullEquality,
) -> Option<usize> {
let outer_rows = *outer_num_rows.get_value()?;
if outer_rows == 0 {
@@ -806,11 +978,21 @@ fn estimate_semi_join_cardinality(
if let (Some(&o), Some(&i)) = (outer_ndv.get_value(),
inner_ndv.get_value())
&& o > 0
{
- let null_frac = outer_stat
- .null_count
- .get_value()
- .map(|&nc| nc as f64 / outer_rows as f64)
- .unwrap_or(0.0);
+ let null_frac = if null_equality ==
NullEquality::NullEqualsNothing {
+ outer_stat
+ .null_count
+ .get_value()
+ .map(|&nc| {
+ if nc > outer_rows {
+ 0.0
+ } else {
+ nc as f64 / outer_rows as f64
+ }
+ })
+ .unwrap_or(0.0)
+ } else {
+ 0.0
+ };
selectivity *= (o.min(i) as f64) / (o as f64) * (1.0 - null_frac);
has_selectivity_estimate = true;
}
@@ -2590,6 +2772,7 @@ mod tests {
create_stats(Some(left_num_rows), left_col_stats.clone(),
false),
create_stats(Some(right_num_rows), right_col_stats.clone(),
false),
&join_on,
+ NullEquality::NullEqualsNothing,
);
assert_eq!(
@@ -2722,6 +2905,7 @@ mod tests {
create_stats(Some(1000), left_col_stats.clone(), false),
create_stats(Some(2000), right_col_stats.clone(), false),
&join_on,
+ NullEquality::NullEqualsNothing,
)
.unwrap();
assert_eq!(partial_join_stats.num_rows, expected_num_rows);
@@ -2775,6 +2959,7 @@ mod tests {
create_stats(Some(1000), left_col_stats.clone(), false),
create_stats(Some(2000), right_col_stats.clone(), false),
&join_on_ab,
+ NullEquality::NullEqualsNothing,
)
.unwrap();
let stats_ba = estimate_join_cardinality(
@@ -2782,6 +2967,7 @@ mod tests {
create_stats(Some(1000), left_col_stats.clone(), false),
create_stats(Some(2000), right_col_stats.clone(), false),
&join_on_ba,
+ NullEquality::NullEqualsNothing,
)
.unwrap();
@@ -2855,6 +3041,7 @@ mod tests {
create_stats(Some(1000), left_col_stats.clone(), true),
create_stats(Some(2000), right_col_stats.clone(), true),
&join_on,
+ NullEquality::NullEqualsNothing,
)
.unwrap();
assert_eq!(partial_join_stats.num_rows, expected_num_rows);
@@ -3090,6 +3277,7 @@ mod tests {
column_statistics: inner_col_stats,
},
&join_on,
+ NullEquality::NullEqualsNothing,
)
.map(|cardinality| cardinality.num_rows);
@@ -3124,6 +3312,7 @@ mod tests {
column_statistics: dummy_column_stats.clone(),
},
&join_on,
+ NullEquality::NullEqualsNothing,
);
assert!(
absent_outer_estimation.is_none(),
@@ -3143,6 +3332,7 @@ mod tests {
column_statistics: dummy_column_stats.clone(),
},
&join_on,
+ NullEquality::NullEqualsNothing,
).expect("Expected non-empty PartialJoinStatistics for SemiJoin with
absent inner num_rows");
assert_eq!(
@@ -3163,6 +3353,7 @@ mod tests {
column_statistics: dummy_column_stats,
},
&join_on,
+ NullEquality::NullEqualsNothing,
);
assert!(
absent_inner_estimation.is_none(),
@@ -3209,6 +3400,7 @@ mod tests {
],
},
&join_on,
+ NullEquality::NullEqualsNothing,
)
.map(|c| c.num_rows);
assert_eq!(result, Some(13), "multi-column semi join");
@@ -3233,6 +3425,7 @@ mod tests {
],
},
&join_on,
+ NullEquality::NullEqualsNothing,
)
.map(|c| c.num_rows);
assert_eq!(result, Some(87), "multi-column anti join");
@@ -3260,6 +3453,7 @@ mod tests {
],
},
&join_on,
+ NullEquality::NullEqualsNothing,
)
.map(|c| c.num_rows);
assert_eq!(result, Some(50), "mixed stats: col1 skipped");
@@ -3284,6 +3478,7 @@ mod tests {
],
},
&join_on,
+ NullEquality::NullEqualsNothing,
)
.map(|c| c.num_rows);
assert_eq!(result, Some(100), "no column has stats on both sides");
@@ -3312,6 +3507,7 @@ mod tests {
],
},
&join_on,
+ NullEquality::NullEqualsNothing,
)
.map(|c| c.num_rows);
assert_eq!(
@@ -3353,6 +3549,7 @@ mod tests {
left_stats.clone(),
right_stats.clone(),
&join_on,
+ NullEquality::NullEqualsNothing,
)
.map(|c| c.num_rows);
assert_eq!(left_semi, Some(50));
@@ -3362,11 +3559,305 @@ mod tests {
left_stats,
right_stats,
&join_on,
+ NullEquality::NullEqualsNothing,
)
.map(|c| c.num_rows);
assert_eq!(left_anti, Some(0));
}
+ #[test]
+ fn test_semi_join_scales_preserved_column_statistics() {
+ let join_on = vec![(
+ Arc::new(Column::new("l_key", 0)) as _,
+ Arc::new(Column::new("r_key", 0)) as _,
+ )];
+
+ let result = estimate_join_cardinality(
+ &JoinType::LeftSemi,
+ Statistics {
+ num_rows: Inexact(432_187),
+ total_byte_size: Absent,
+ column_statistics: vec![
+ ColumnStatistics {
+ null_count: Exact(7_196),
+ min_value: Exact(ScalarValue::from(1_i64)),
+ max_value: Exact(ScalarValue::from(432_187_i64)),
+ sum_value: Absent,
+ distinct_count: Absent,
+ byte_size: Exact(3_457_496),
+ },
+ ColumnStatistics {
+ null_count: Exact(7_196),
+ min_value: Exact(ScalarValue::from(1_i64)),
+ max_value: Exact(ScalarValue::from(432_187_i64)),
+ sum_value: Exact(ScalarValue::from(1_000_000_i64)),
+ distinct_count: Exact(500_000),
+ byte_size: Exact(3_457_496),
+ },
+ ],
+ },
+ Statistics {
+ num_rows: Inexact(32),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Inexact(1),
+ Inexact(32),
+ Absent,
+ Absent,
+ )],
+ },
+ &join_on,
+ NullEquality::NullEqualsNothing,
+ )
+ .expect("semi join cardinality should be estimated");
+
+ assert_eq!(result.num_rows, 32);
+ assert_eq!(result.total_byte_size, Inexact(512));
+ assert_eq!(result.column_statistics[0].null_count, Exact(0));
+ assert_eq!(result.column_statistics[0].distinct_count, Absent);
+ assert_eq!(
+ result.column_statistics[0].min_value,
+ Inexact(ScalarValue::from(1_i64))
+ );
+ assert_eq!(
+ result.column_statistics[0].max_value,
+ Inexact(ScalarValue::from(432_187_i64))
+ );
+ assert_eq!(result.column_statistics[0].byte_size, Inexact(256));
+ assert_eq!(result.column_statistics[1].null_count, Inexact(1));
+ // distinct_count is capped at the non-null output rows (32 - 1).
+ assert_eq!(result.column_statistics[1].distinct_count, Inexact(31));
+ assert_eq!(result.column_statistics[1].sum_value, Absent);
+ assert_eq!(result.column_statistics[1].byte_size, Inexact(256));
+ }
+
+ #[test]
+ fn test_semi_join_null_equals_null_scales_join_key_nulls() {
+ let join_on = vec![(
+ Arc::new(Column::new("l_key", 0)) as _,
+ Arc::new(Column::new("r_key", 0)) as _,
+ )];
+
+ let result = estimate_join_cardinality(
+ &JoinType::LeftSemi,
+ Statistics {
+ num_rows: Inexact(100),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Absent,
+ Absent,
+ Inexact(100),
+ Exact(20),
+ )],
+ },
+ Statistics {
+ num_rows: Inexact(10),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Absent,
+ Absent,
+ Inexact(10),
+ Absent,
+ )],
+ },
+ &join_on,
+ NullEquality::NullEqualsNull,
+ )
+ .expect("semi join cardinality should be estimated");
+
+ assert_eq!(result.num_rows, 10);
+ assert_eq!(result.column_statistics[0].null_count, Inexact(2));
+ assert_eq!(result.column_statistics[0].distinct_count, Inexact(8));
+ }
+
+ #[test]
+ fn test_semi_join_total_byte_size_absent_if_any_column_byte_size_absent() {
+ let join_on = vec![(
+ Arc::new(Column::new("l_key", 0)) as _,
+ Arc::new(Column::new("r_key", 0)) as _,
+ )];
+
+ let result = estimate_join_cardinality(
+ &JoinType::LeftSemi,
+ Statistics {
+ num_rows: Inexact(100),
+ total_byte_size: Absent,
+ column_statistics: vec![
+ ColumnStatistics {
+ null_count: Exact(0),
+ min_value: Exact(ScalarValue::from(1_i64)),
+ max_value: Exact(ScalarValue::from(100_i64)),
+ sum_value: Absent,
+ distinct_count: Absent,
+ byte_size: Exact(800),
+ },
+ ColumnStatistics {
+ null_count: Exact(0),
+ min_value: Absent,
+ max_value: Absent,
+ sum_value: Absent,
+ distinct_count: Absent,
+ byte_size: Absent,
+ },
+ ],
+ },
+ Statistics {
+ num_rows: Inexact(10),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Inexact(1),
+ Inexact(10),
+ Absent,
+ Absent,
+ )],
+ },
+ &join_on,
+ NullEquality::NullEqualsNothing,
+ )
+ .expect("semi join cardinality should be estimated");
+
+ assert_eq!(result.num_rows, 10);
+ assert_eq!(result.total_byte_size, Absent);
+ }
+
+ #[test]
+ fn test_anti_join_preserves_join_key_nulls() {
+ let join_on = vec![(
+ Arc::new(Column::new("l_key", 0)) as _,
+ Arc::new(Column::new("r_key", 0)) as _,
+ )];
+
+ let result = estimate_join_cardinality(
+ &JoinType::LeftAnti,
+ Statistics {
+ num_rows: Inexact(1_000_000),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Absent,
+ Absent,
+ Inexact(900_000),
+ Exact(100_000),
+ )],
+ },
+ Statistics {
+ num_rows: Inexact(900_000),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Absent,
+ Absent,
+ Inexact(900_000),
+ Absent,
+ )],
+ },
+ &join_on,
+ NullEquality::NullEqualsNothing,
+ )
+ .expect("anti join cardinality should be estimated");
+
+ assert_eq!(result.num_rows, 100_000);
+ assert_eq!(result.column_statistics[0].null_count, Inexact(100_000));
+ assert_eq!(result.column_statistics[0].distinct_count, Inexact(0));
+ }
+
+ #[test]
+ fn test_anti_join_null_equals_null_scales_join_key_nulls() {
+ let join_on = vec![(
+ Arc::new(Column::new("l_key", 0)) as _,
+ Arc::new(Column::new("r_key", 0)) as _,
+ )];
+
+ let result = estimate_join_cardinality(
+ &JoinType::LeftAnti,
+ Statistics {
+ num_rows: Inexact(100),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Absent,
+ Absent,
+ Inexact(100),
+ Exact(20),
+ )],
+ },
+ Statistics {
+ num_rows: Inexact(10),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Absent,
+ Absent,
+ Inexact(10),
+ Absent,
+ )],
+ },
+ &join_on,
+ NullEquality::NullEqualsNull,
+ )
+ .expect("anti join cardinality should be estimated");
+
+ assert_eq!(result.num_rows, 90);
+ assert_eq!(result.column_statistics[0].null_count, Inexact(18));
+ assert_eq!(result.column_statistics[0].distinct_count, Inexact(72));
+ }
+
+ #[test]
+ fn test_right_semi_join_scales_preserved_column_statistics() {
+ let join_on = vec![(
+ Arc::new(Column::new("l_key", 0)) as _,
+ Arc::new(Column::new("r_key", 0)) as _,
+ )];
+
+ // For a right semi join the right input is preserved, so its column
+ // statistics (and right join-key index) are the ones normalized.
+ let result = estimate_join_cardinality(
+ &JoinType::RightSemi,
+ Statistics {
+ num_rows: Inexact(32),
+ total_byte_size: Absent,
+ column_statistics: vec![create_column_stats(
+ Inexact(1),
+ Inexact(32),
+ Absent,
+ Absent,
+ )],
+ },
+ Statistics {
+ num_rows: Inexact(432_187),
+ total_byte_size: Absent,
+ column_statistics: vec![
+ ColumnStatistics {
+ null_count: Exact(7_196),
+ min_value: Exact(ScalarValue::from(1_i64)),
+ max_value: Exact(ScalarValue::from(432_187_i64)),
+ sum_value: Absent,
+ distinct_count: Absent,
+ byte_size: Exact(3_457_496),
+ },
+ ColumnStatistics {
+ null_count: Exact(7_196),
+ min_value: Exact(ScalarValue::from(1_i64)),
+ max_value: Exact(ScalarValue::from(432_187_i64)),
+ sum_value: Exact(ScalarValue::from(1_000_000_i64)),
+ distinct_count: Exact(500_000),
+ byte_size: Exact(3_457_496),
+ },
+ ],
+ },
+ &join_on,
+ NullEquality::NullEqualsNothing,
+ )
+ .expect("right semi join cardinality should be estimated");
+
+ assert_eq!(result.num_rows, 32);
+ // Join-key column: null counts collapse to exact zero (null keys
never match).
+ assert_eq!(result.column_statistics[0].null_count, Exact(0));
+ assert_eq!(result.column_statistics[0].byte_size, Inexact(256));
+ // Non-key column: counts scaled to the subset, sum dropped, distinct
+ // capped at the non-null output rows (32 - 1).
+ assert_eq!(result.column_statistics[1].null_count, Inexact(1));
+ assert_eq!(result.column_statistics[1].distinct_count, Inexact(31));
+ assert_eq!(result.column_statistics[1].sum_value, Absent);
+ assert_eq!(result.column_statistics[1].byte_size, Inexact(256));
+ }
+
#[test]
fn test_calculate_join_output_ordering() -> Result<()> {
let left_ordering = LexOrdering::new(vec![
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]