This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6e0dde0890 fix(stats): widen sum_value integer arithmetic to
SUM-compatible types (#20865)
6e0dde0890 is described below
commit 6e0dde0890ef967183f5fa828195a44cbf99b870
Author: Kumar Ujjawal <[email protected]>
AuthorDate: Wed Mar 25 20:49:26 2026 +0530
fix(stats): widen sum_value integer arithmetic to SUM-compatible types
(#20865)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #20826.
## Rationale for this change
As discussed in the review thread on #20768 and tracked by #20826,
`sum_value` should not keep narrow integer column types during stats
aggregation, because merge/multiply paths can overflow before values are
widened.
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
This PR updates statistics `sum_value` arithmetic to match SUM-style
widening for small integer types, and applies that behavior consistently
across merge and multiplication paths.
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
Yes
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/common/src/stats.rs | 133 +++++++++++++++++++++--
datafusion/datasource/src/statistics.rs | 81 +++++++++++++-
datafusion/physical-expr/src/projection.rs | 42 ++++++-
datafusion/physical-plan/src/joins/cross_join.rs | 85 +++++++++++----
datafusion/physical-plan/src/union.rs | 2 +-
5 files changed, 304 insertions(+), 39 deletions(-)
diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index 7d3d511ca7..4cf5cc3661 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -203,6 +203,24 @@ impl Precision<usize> {
}
impl Precision<ScalarValue> {
+ fn sum_data_type(data_type: &DataType) -> DataType {
+ match data_type {
+ DataType::Int8 | DataType::Int16 | DataType::Int32 =>
DataType::Int64,
+ DataType::UInt8 | DataType::UInt16 | DataType::UInt32 =>
DataType::UInt64,
+ _ => data_type.clone(),
+ }
+ }
+
+ fn cast_scalar_to_sum_type(value: &ScalarValue) -> Result<ScalarValue> {
+ let source_type = value.data_type();
+ let target_type = Self::sum_data_type(&source_type);
+ if source_type == target_type {
+ Ok(value.clone())
+ } else {
+ value.cast_to(&target_type)
+ }
+ }
+
/// Calculates the sum of two (possibly inexact) [`ScalarValue`] values,
/// conservatively propagating exactness information. If one of the input
/// values is [`Precision::Absent`], the result is `Absent` too.
@@ -228,6 +246,31 @@ impl Precision<ScalarValue> {
}
}
+ /// Casts integer values to the wider SQL `SUM` return type.
+ ///
+ /// This narrows overflow risk when `sum_value` statistics are merged:
+ /// `Int8/Int16/Int32 -> Int64` and `UInt8/UInt16/UInt32 -> UInt64`.
+ pub fn cast_to_sum_type(&self) -> Precision<ScalarValue> {
+ match (self.is_exact(), self.get_value()) {
+ (Some(true), Some(value)) => Self::cast_scalar_to_sum_type(value)
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent),
+ (Some(false), Some(value)) => Self::cast_scalar_to_sum_type(value)
+ .map(Precision::Inexact)
+ .unwrap_or(Precision::Absent),
+ (_, _) => Precision::Absent,
+ }
+ }
+
+ /// SUM-style addition with integer widening to match SQL `SUM` return
+ /// types for smaller integral inputs.
+ pub fn add_for_sum(&self, other: &Precision<ScalarValue>) ->
Precision<ScalarValue> {
+ let mut lhs = self.cast_to_sum_type();
+ let rhs = other.cast_to_sum_type();
+ precision_add(&mut lhs, &rhs);
+ lhs
+ }
+
/// Calculates the difference of two (possibly inexact) [`ScalarValue`]
values,
/// conservatively propagating exactness information. If one of the input
/// values is [`Precision::Absent`], the result is `Absent` too.
@@ -620,7 +663,7 @@ impl Statistics {
/// assert_eq!(merged.column_statistics[0].max_value,
/// Precision::Exact(ScalarValue::from(200)));
/// assert_eq!(merged.column_statistics[0].sum_value,
- /// Precision::Exact(ScalarValue::from(1500)));
+ /// Precision::Exact(ScalarValue::Int64(Some(1500))));
/// ```
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) ->
Result<Statistics>
where
@@ -664,7 +707,7 @@ impl Statistics {
null_count: cs.null_count,
max_value: cs.max_value.clone(),
min_value: cs.min_value.clone(),
- sum_value: cs.sum_value.clone(),
+ sum_value: cs.sum_value.cast_to_sum_type(),
distinct_count: cs.distinct_count,
byte_size: cs.byte_size,
})
@@ -693,7 +736,8 @@ impl Statistics {
};
col_stats.min_value =
col_stats.min_value.min(&item_cs.min_value);
col_stats.max_value =
col_stats.max_value.max(&item_cs.max_value);
- precision_add(&mut col_stats.sum_value, &item_cs.sum_value);
+ let item_sum_value = item_cs.sum_value.cast_to_sum_type();
+ precision_add(&mut col_stats.sum_value, &item_sum_value);
col_stats.byte_size =
col_stats.byte_size.add(&item_cs.byte_size);
}
}
@@ -877,7 +921,15 @@ pub struct ColumnStatistics {
pub max_value: Precision<ScalarValue>,
/// Minimum value of column
pub min_value: Precision<ScalarValue>,
- /// Sum value of a column
+ /// Sum value of a column.
+ ///
+ /// For integral columns, values should be kept in SUM-compatible widened
+ /// types (`Int8/Int16/Int32 -> Int64`, `UInt8/UInt16/UInt32 -> UInt64`) to
+ /// reduce overflow risk during statistics propagation.
+ ///
+ /// Callers should prefer [`ColumnStatistics::with_sum_value`] for setting
+ /// this field and [`Precision<ScalarValue>::add_for_sum`] /
+ /// [`Precision<ScalarValue>::cast_to_sum_type`] for sum arithmetic.
pub sum_value: Precision<ScalarValue>,
/// Number of distinct values
pub distinct_count: Precision<usize>,
@@ -942,7 +994,19 @@ impl ColumnStatistics {
/// Set the sum value
pub fn with_sum_value(mut self, sum_value: Precision<ScalarValue>) -> Self
{
- self.sum_value = sum_value;
+ self.sum_value = match sum_value {
+ Precision::Exact(value) => {
+ Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent)
+ }
+ Precision::Inexact(value) => {
+ Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
+ .map(Precision::Inexact)
+ .unwrap_or(Precision::Absent)
+ }
+ Precision::Absent => Precision::Absent,
+ };
self
}
@@ -1095,6 +1159,45 @@ mod tests {
assert_eq!(precision.add(&Precision::Absent), Precision::Absent);
}
+ #[test]
+ fn test_add_for_sum_scalar_integer_widening() {
+ let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
+
+ assert_eq!(
+
precision.add_for_sum(&Precision::Exact(ScalarValue::Int32(Some(23)))),
+ Precision::Exact(ScalarValue::Int64(Some(65))),
+ );
+ assert_eq!(
+
precision.add_for_sum(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
+ Precision::Inexact(ScalarValue::Int64(Some(65))),
+ );
+ }
+
+ #[test]
+ fn test_add_for_sum_prevents_int32_overflow() {
+ let lhs = Precision::Exact(ScalarValue::Int32(Some(i32::MAX)));
+ let rhs = Precision::Exact(ScalarValue::Int32(Some(1)));
+
+ assert_eq!(
+ lhs.add_for_sum(&rhs),
+ Precision::Exact(ScalarValue::Int64(Some(i64::from(i32::MAX) +
1))),
+ );
+ }
+
+ #[test]
+ fn test_add_for_sum_scalar_unsigned_integer_widening() {
+ let precision = Precision::Exact(ScalarValue::UInt32(Some(42)));
+
+ assert_eq!(
+
precision.add_for_sum(&Precision::Exact(ScalarValue::UInt32(Some(23)))),
+ Precision::Exact(ScalarValue::UInt64(Some(65))),
+ );
+ assert_eq!(
+
precision.add_for_sum(&Precision::Inexact(ScalarValue::UInt32(Some(23)))),
+ Precision::Inexact(ScalarValue::UInt64(Some(65))),
+ );
+ }
+
#[test]
fn test_sub() {
let precision1 = Precision::Exact(42);
@@ -1340,7 +1443,7 @@ mod tests {
);
assert_eq!(
col1_stats.sum_value,
- Precision::Exact(ScalarValue::Int32(Some(1100)))
+ Precision::Exact(ScalarValue::Int64(Some(1100)))
); // 500 + 600
let col2_stats = &summary_stats.column_statistics[1];
@@ -1355,7 +1458,7 @@ mod tests {
);
assert_eq!(
col2_stats.sum_value,
- Precision::Exact(ScalarValue::Int32(Some(2200)))
+ Precision::Exact(ScalarValue::Int64(Some(2200)))
); // 1000 + 1200
}
@@ -1997,6 +2100,16 @@ mod tests {
assert_eq!(col_stats.byte_size, Precision::Exact(8192));
}
+ #[test]
+ fn test_with_sum_value_builder_widens_small_integers() {
+ let col_stats = ColumnStatistics::new_unknown()
+ .with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(123))));
+ assert_eq!(
+ col_stats.sum_value,
+ Precision::Exact(ScalarValue::UInt64(Some(123)))
+ );
+ }
+
#[test]
fn test_with_fetch_scales_byte_size() {
// Test that byte_size is scaled by the row ratio in with_fetch
@@ -2144,7 +2257,7 @@ mod tests {
);
assert_eq!(
col1_stats.sum_value,
- Precision::Exact(ScalarValue::Int32(Some(1100)))
+ Precision::Exact(ScalarValue::Int64(Some(1100)))
);
let col2_stats = &summary_stats.column_statistics[1];
@@ -2159,7 +2272,7 @@ mod tests {
);
assert_eq!(
col2_stats.sum_value,
- Precision::Exact(ScalarValue::Int32(Some(2200)))
+ Precision::Exact(ScalarValue::Int64(Some(2200)))
);
}
@@ -2508,7 +2621,7 @@ mod tests {
);
assert_eq!(
col_stats.sum_value,
- Precision::Inexact(ScalarValue::Int32(Some(1500)))
+ Precision::Inexact(ScalarValue::Int64(Some(1500)))
);
}
}
diff --git a/datafusion/datasource/src/statistics.rs
b/datafusion/datasource/src/statistics.rs
index b1a56e096c..e5a1e4613b 100644
--- a/datafusion/datasource/src/statistics.rs
+++ b/datafusion/datasource/src/statistics.rs
@@ -293,7 +293,7 @@ fn sort_columns_from_physical_sort_exprs(
since = "47.0.0",
note = "Please use `get_files_with_limit` and
`compute_all_files_statistics` instead"
)]
-#[expect(unused)]
+#[cfg_attr(not(test), expect(unused))]
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
file_schema: SchemaRef,
@@ -329,7 +329,7 @@ pub async fn get_statistics_with_limit(
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
- col_stats_set[index].sum_value = file_column.sum_value;
+ col_stats_set[index].sum_value =
file_column.sum_value.cast_to_sum_type();
}
// If the number of rows exceeds the limit, we can stop processing
@@ -374,7 +374,7 @@ pub async fn get_statistics_with_limit(
col_stats.null_count = col_stats.null_count.add(file_nc);
col_stats.max_value = col_stats.max_value.max(file_max);
col_stats.min_value = col_stats.min_value.min(file_min);
- col_stats.sum_value = col_stats.sum_value.add(file_sum);
+ col_stats.sum_value =
col_stats.sum_value.add_for_sum(file_sum);
col_stats.byte_size = col_stats.byte_size.add(file_sbs);
}
@@ -497,3 +497,78 @@ pub fn add_row_stats(
) -> Precision<usize> {
file_num_rows.add(&num_rows)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::PartitionedFile;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use futures::stream;
+
+ fn file_stats(sum: u32) -> Statistics {
+ Statistics {
+ num_rows: Precision::Exact(1),
+ total_byte_size: Precision::Exact(4),
+ column_statistics: vec![ColumnStatistics {
+ null_count: Precision::Exact(0),
+ max_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
+ min_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
+ sum_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
+ distinct_count: Precision::Exact(1),
+ byte_size: Precision::Exact(4),
+ }],
+ }
+ }
+
+ #[tokio::test]
+ #[expect(deprecated)]
+ async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
+ -> Result<()> {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32,
true)]));
+
+ let files = stream::iter(vec![Ok((
+ PartitionedFile::new("f1.parquet", 1),
+ Arc::new(file_stats(100)),
+ ))]);
+
+ let (_group, stats) =
+ get_statistics_with_limit(files, schema, None, false).await?;
+
+ assert_eq!(
+ stats.column_statistics[0].sum_value,
+ Precision::Exact(ScalarValue::UInt64(Some(100)))
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[expect(deprecated)]
+ async fn test_get_statistics_with_limit_merges_sum_with_unsigned_widening()
+ -> Result<()> {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32,
true)]));
+
+ let files = stream::iter(vec![
+ Ok((
+ PartitionedFile::new("f1.parquet", 1),
+ Arc::new(file_stats(100)),
+ )),
+ Ok((
+ PartitionedFile::new("f2.parquet", 1),
+ Arc::new(file_stats(200)),
+ )),
+ ]);
+
+ let (_group, stats) =
+ get_statistics_with_limit(files, schema, None, true).await?;
+
+ assert_eq!(
+ stats.column_statistics[0].sum_value,
+ Precision::Exact(ScalarValue::UInt64(Some(300)))
+ );
+
+ Ok(())
+ }
+}
diff --git a/datafusion/physical-expr/src/projection.rs
b/datafusion/physical-expr/src/projection.rs
index b9f98c03da..e133e5a849 100644
--- a/datafusion/physical-expr/src/projection.rs
+++ b/datafusion/physical-expr/src/projection.rs
@@ -693,12 +693,15 @@ impl ProjectionExprs {
Precision::Absent
};
- let sum_value =
Precision::<ScalarValue>::from(stats.num_rows)
- .cast_to(&value.data_type())
- .ok()
- .map(|row_count| {
-
Precision::Exact(value.clone()).multiply(&row_count)
+ let widened_sum =
Precision::Exact(value.clone()).cast_to_sum_type();
+ let sum_value = widened_sum
+ .get_value()
+ .and_then(|sum| {
+ Precision::<ScalarValue>::from(stats.num_rows)
+ .cast_to(&sum.data_type())
+ .ok()
})
+ .map(|row_count| widened_sum.multiply(&row_count))
.unwrap_or(Precision::Absent);
ColumnStatistics {
@@ -2864,6 +2867,35 @@ pub(crate) mod tests {
Ok(())
}
+ #[test]
+ fn test_project_statistics_with_i32_literal_sum_widens_to_i64() ->
Result<()> {
+ let input_stats = get_stats();
+ let input_schema = get_schema();
+
+ let projection = ProjectionExprs::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
+ alias: "constant".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col0", 0)),
+ alias: "num".to_string(),
+ },
+ ]);
+
+ let output_stats = projection.project_statistics(
+ input_stats,
+ &projection.project_schema(&input_schema)?,
+ )?;
+
+ assert_eq!(
+ output_stats.column_statistics[0].sum_value,
+ Precision::Exact(ScalarValue::Int64(Some(50)))
+ );
+
+ Ok(())
+ }
+
// Test statistics calculation for NULL literal (constant NULL column)
#[test]
fn test_project_statistics_with_null_literal() -> Result<()> {
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index a895f69dc5..b64de91d95 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -458,32 +458,34 @@ fn stats_cartesian_product(
// Min, max and distinct_count on the other hand are invariants.
let cross_join_stats = left_col_stats
.into_iter()
- .map(|s| ColumnStatistics {
- null_count: s.null_count.multiply(&right_row_count),
- distinct_count: s.distinct_count,
- min_value: s.min_value,
- max_value: s.max_value,
- sum_value: s
- .sum_value
- .get_value()
- // Cast the row count into the same type as any existing sum
value
- .and_then(|v| {
- Precision::<ScalarValue>::from(right_row_count)
- .cast_to(&v.data_type())
- .ok()
- })
- .map(|row_count| s.sum_value.multiply(&row_count))
- .unwrap_or(Precision::Absent),
- byte_size: Precision::Absent,
+ .map(|s| {
+ let widened_sum = s.sum_value.cast_to_sum_type();
+ ColumnStatistics {
+ null_count: s.null_count.multiply(&right_row_count),
+ distinct_count: s.distinct_count,
+ min_value: s.min_value,
+ max_value: s.max_value,
+ sum_value: widened_sum
+ .get_value()
+ // Cast the row count into the same type as any existing
sum value
+ .and_then(|v| {
+ Precision::<ScalarValue>::from(right_row_count)
+ .cast_to(&v.data_type())
+ .ok()
+ })
+ .map(|row_count| widened_sum.multiply(&row_count))
+ .unwrap_or(Precision::Absent),
+ byte_size: Precision::Absent,
+ }
})
.chain(right_col_stats.into_iter().map(|s| {
+ let widened_sum = s.sum_value.cast_to_sum_type();
ColumnStatistics {
null_count: s.null_count.multiply(&left_row_count),
distinct_count: s.distinct_count,
min_value: s.min_value,
max_value: s.max_value,
- sum_value: s
- .sum_value
+ sum_value: widened_sum
.get_value()
// Cast the row count into the same type as any existing
sum value
.and_then(|v| {
@@ -491,7 +493,7 @@ fn stats_cartesian_product(
.cast_to(&v.data_type())
.ok()
})
- .map(|row_count| s.sum_value.multiply(&row_count))
+ .map(|row_count| widened_sum.multiply(&row_count))
.unwrap_or(Precision::Absent),
byte_size: Precision::Absent,
}
@@ -875,6 +877,49 @@ mod tests {
assert_eq!(result, expected);
}
+ #[tokio::test]
+ async fn test_stats_cartesian_product_unsigned_sum_widens_to_u64() {
+ let left_row_count = 2;
+ let right_row_count = 3;
+
+ let left = Statistics {
+ num_rows: Precision::Exact(left_row_count),
+ total_byte_size: Precision::Exact(10),
+ column_statistics: vec![ColumnStatistics {
+ distinct_count: Precision::Exact(2),
+ max_value: Precision::Exact(ScalarValue::UInt32(Some(10))),
+ min_value: Precision::Exact(ScalarValue::UInt32(Some(1))),
+ sum_value: Precision::Exact(ScalarValue::UInt32(Some(7))),
+ null_count: Precision::Exact(0),
+ byte_size: Precision::Absent,
+ }],
+ };
+
+ let right = Statistics {
+ num_rows: Precision::Exact(right_row_count),
+ total_byte_size: Precision::Exact(10),
+ column_statistics: vec![ColumnStatistics {
+ distinct_count: Precision::Exact(3),
+ max_value: Precision::Exact(ScalarValue::UInt32(Some(12))),
+ min_value: Precision::Exact(ScalarValue::UInt32(Some(0))),
+ sum_value: Precision::Exact(ScalarValue::UInt32(Some(11))),
+ null_count: Precision::Exact(0),
+ byte_size: Precision::Absent,
+ }],
+ };
+
+ let result = stats_cartesian_product(left, right);
+
+ assert_eq!(
+ result.column_statistics[0].sum_value,
+ Precision::Exact(ScalarValue::UInt64(Some(21)))
+ );
+ assert_eq!(
+ result.column_statistics[1].sum_value,
+ Precision::Exact(ScalarValue::UInt64(Some(22)))
+ );
+ }
+
#[tokio::test]
async fn test_join() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index 218eb50015..eb16375a2d 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -857,7 +857,7 @@ fn col_stats_union(
left.distinct_count = union_distinct_count(&left, right);
left.min_value = left.min_value.min(&right.min_value);
left.max_value = left.max_value.max(&right.max_value);
- left.sum_value = left.sum_value.add(&right.sum_value);
+ left.sum_value = left.sum_value.add_for_sum(&right.sum_value);
left.null_count = left.null_count.add(&right.null_count);
left
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]