Jefffrey commented on code in PR #19618: URL: https://github.com/apache/datafusion/pull/19618#discussion_r2663748909
########## datafusion/sqllogictest/test_files/aggregate.slt: ########## @@ -8246,3 +8246,137 @@ query R select percentile_cont(null, 0.5); ---- NULL + +########### +# Issue #19612: Test that percentile_cont and median produce identical results +# in window frame queries with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. +# Previously percentile_cont consumed its internal state during evaluate(), +# causing incorrect results when called multiple times in window queries. +########### + +# Test percentile_cont window frame behavior (fix for issue #19612) +statement ok +CREATE TABLE percentile_window_test ( Review Comment: Is this the same as `median_window_test`? https://github.com/apache/datafusion/blob/aee5cd9f3517b2ac9536fd4eb254f1e1349711df/datafusion/sqllogictest/test_files/aggregate.slt#L1049-L1068 Could we just move these tests up to reuse that table so we don't have unnecessary duplication. ########## datafusion/functions-aggregate/src/percentile_cont.rs: ########## @@ -427,14 +428,55 @@ impl<T: ArrowNumericType + Debug> Accumulator for PercentileContAccumulator<T> { } fn evaluate(&mut self) -> Result<ScalarValue> { - let d = std::mem::take(&mut self.all_values); - let value = calculate_percentile::<T>(d, self.percentile); + let value = calculate_percentile::<T>(&mut self.all_values, self.percentile); ScalarValue::new_primitive::<T>(value, &T::DATA_TYPE) } fn size(&self) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::<T::Native>() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // Cast to target type if needed (e.g., integer to Float64) + let values = if values[0].data_type() != &T::DATA_TYPE { + arrow::compute::cast(&values[0], &T::DATA_TYPE)? + } else { + Arc::clone(&values[0]) + }; Review Comment: We can remove this now that https://github.com/apache/datafusion/pull/19611 fixed the need to do this ########## datafusion/functions-aggregate/src/percentile_cont.rs: ########## @@ -427,14 +428,55 @@ impl<T: ArrowNumericType + Debug> Accumulator for PercentileContAccumulator<T> { } fn evaluate(&mut self) -> Result<ScalarValue> { - let d = std::mem::take(&mut self.all_values); - let value = calculate_percentile::<T>(d, self.percentile); + let value = calculate_percentile::<T>(&mut self.all_values, self.percentile); ScalarValue::new_primitive::<T>(value, &T::DATA_TYPE) } fn size(&self) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::<T::Native>() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // Cast to target type if needed (e.g., integer to Float64) + let values = if values[0].data_type() != &T::DATA_TYPE { + arrow::compute::cast(&values[0], &T::DATA_TYPE)? + } else { + Arc::clone(&values[0]) + }; + + let mut to_remove: HashMap<ScalarValue, usize> = HashMap::new(); + for i in 0..values.len() { + let v = ScalarValue::try_from_array(&values, i)?; + if !v.is_null() { + *to_remove.entry(v).or_default() += 1; + } + } + + let mut i = 0; + while i < self.all_values.len() { + let k = + ScalarValue::new_primitive::<T>(Some(self.all_values[i]), &T::DATA_TYPE)?; + if let Some(count) = to_remove.get_mut(&k) + && *count > 0 + { + self.all_values.swap_remove(i); + *count -= 1; + if *count == 0 { + to_remove.remove(&k); + if to_remove.is_empty() { + break; + } + } Review Comment: Just for my own reference (and anyone else reviewing), this is the same code as from median: - https://github.com/apache/datafusion/pull/19278 ########## docs/source/library-user-guide/functions/adding-udfs.md: ########## @@ -1350,6 +1350,71 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs +### Window Frame Compatible Accumulators Review Comment: Thanks for adding this documentation, however I was thinking its better to have it closer to the source; i.e. fix the docstring of `evaluate` itself to capture this subtlety. ########## datafusion/functions-aggregate/src/string_agg.rs: ########## @@ -384,14 +384,13 @@ impl Accumulator for SimpleStringAggAccumulator { } fn evaluate(&mut self) -> Result<ScalarValue> { - let result = if self.has_value { - ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string))) + if self.has_value { + Ok(ScalarValue::LargeUtf8(Some( + self.accumulated_string.clone(), Review Comment: I guess this is unavoidable 🙁 I might need to think on this a bit to see if there are ways around requiring this clone 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
