This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22139-0c4ace8b77461cac3538d38b1122a7dbf08c4d4f in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 07c0fc9a5fb4438bd37f75cecbdab3becc93b1d0 Author: Oleks V <[email protected]> AuthorDate: Wed May 13 07:25:27 2026 -0700 feat: fix AVG sliding windows edge case (#22139) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #22138 . ## Rationale for this change `AVG` used as a window aggregate can return `NaN` (and, for `Decimal` / `Duration`, panic on integer division by zero) when every value in the window frame is NULL. ```sql SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) FROM (VALUES (1,1), (2,2), (3,NULL), (4,NULL)) t(i,v); ``` | i | current output | expected (DuckDB/PgSQL) | |---|----------------|-------------------| | 1 | 1.5 | 1.5 | | 2 | 2.0 | 2.0 | | 3 | **NaN** | **NULL** | | 4 | **NaN** | **NULL** | Root cause: sliding-window execution calls `Accumulator::retract_batch` as rows leave the frame. Once every contributing value has been retracted, `self.count` drops back to `0` but `self.sum` stays `Some(0.0)` (or a tiny floating-point residual). `evaluate()` then computes `sum / 0`, which yields `NaN` on `Float64`, and would panic with integer division by zero on `DecimalAvgAccumulator` and `DurationAvgAccumulator`. The non-sliding aggregation path is unaffected because there `sum` becomes `Some(_)` only after at least one non-NULL value has been added, so `count == 0` implies `sum == None`. ## What changes are included in this PR? `datafusion/functions-aggregate/src/average.rs` — guard all three affected `evaluate()` implementations with an explicit `count == 0 → None` short-circuit: - `AvgAccumulator::evaluate` (Float64) - `DecimalAvgAccumulator::evaluate` (Decimal32/64/128/256) - `DurationAvgAccumulator::evaluate` (Duration*) This matches the idiom already used by sibling retractable accumulators (`variance.rs` uses an explicit `match self.count` before division; `sum.rs` uses a `(self.count != 0).then_some(..)` guard). --- datafusion/functions-aggregate/src/average.rs | 50 +++++++++++++++++++-------- datafusion/sqllogictest/test_files/window.slt | 48 +++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 15 deletions(-) diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index bcccea3813..24f2777797 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -519,9 +519,16 @@ impl Accumulator for AvgAccumulator { } fn evaluate(&mut self) -> Result<ScalarValue> { - Ok(ScalarValue::Float64( - self.sum.map(|f| f / self.count as f64), - )) + // In sliding-window mode `retract_batch` can bring `count` back to 0 + // while `sum` remains `Some(..)` (possibly zero or a floating-point + // residual). Guard against that so the frame with no non-NULL values + // yields NULL rather than NaN / ±Inf. + let avg = if self.count == 0 { + None + } else { + self.sum.map(|f| f / self.count as f64) + }; + Ok(ScalarValue::Float64(avg)) } fn size(&self) -> usize { @@ -584,17 +591,23 @@ impl<T: DecimalType + ArrowNumericType + Debug> Accumulator for DecimalAvgAccumu } fn evaluate(&mut self) -> Result<ScalarValue> { - let v = self - .sum - .map(|v| { - DecimalAverager::<T>::try_new( - self.sum_scale, - self.target_precision, - self.target_scale, - )? - .avg(v, T::Native::from_usize(self.count as usize).unwrap()) - }) - .transpose()?; + // `count == 0` can occur in sliding-window mode after `retract_batch` + // removes every contributing value. Return NULL rather than dividing + // by zero (which would panic for integer decimal types). + let v = if self.count == 0 { + None + } else { + self.sum + .map(|v| { + DecimalAverager::<T>::try_new( + self.sum_scale, + self.target_precision, + self.target_scale, + )? + .avg(v, T::Native::from_usize(self.count as usize).unwrap()) + }) + .transpose()? + }; ScalarValue::new_primitive::<T>( v, @@ -670,7 +683,14 @@ impl Accumulator for DurationAvgAccumulator { } fn evaluate(&mut self) -> Result<ScalarValue> { - let avg = self.sum.map(|sum| sum / self.count as i64); + // Guard against `count == 0` which can happen in sliding-window mode + // after every contributing value has been retracted. Without this + // check we would integer-divide by zero. + let avg = if self.count == 0 { + None + } else { + self.sum.map(|sum| sum / self.count as i64) + }; match self.result_unit { TimeUnit::Second => Ok(ScalarValue::DurationSecond(avg)), diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 74c2e38baa..457f170cb3 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6236,6 +6236,54 @@ INNER JOIN issue_20194_t2 t2 ---- 6774502793 10040029 1 +# AVG over a sliding window must yield NULL when the frame has no non-NULL +# values — including frames that became empty via `retract_batch`. Covers +# Float64, Decimal, and the narrow-frame retract-to-empty case. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,1),(2,2),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 1.5 +2 2 +3 NULL +4 NULL + +# All-NULL input — every frame is empty. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,CAST(NULL AS INT)),(2,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 NULL +2 NULL + +# Narrow sliding frame that drains to empty each row. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) +FROM (VALUES(1,CAST(NULL AS INT)),(2,1),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 NULL +2 NULL +3 1 +4 NULL + +# Decimal variant — the integer-division path would otherwise panic on an +# empty frame. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,CAST(1.5 AS DECIMAL(10,2))), + (2,CAST(2.5 AS DECIMAL(10,2))), + (3,CAST(NULL AS DECIMAL(10,2))), + (4,CAST(NULL AS DECIMAL(10,2)))) t(i,v) +ORDER BY i; +---- +1 2 +2 2.5 +3 NULL +4 NULL + # Config reset statement ok reset datafusion.execution.batch_size; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
