Jefffrey commented on code in PR #19618:
URL: https://github.com/apache/datafusion/pull/19618#discussion_r2663748909


##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -8246,3 +8246,137 @@ query R
 select percentile_cont(null, 0.5);
 ----
 NULL
+
+###########
+# Issue #19612: Test that percentile_cont and median produce identical results
+# in window frame queries with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW.
+# Previously percentile_cont consumed its internal state during evaluate(),
+# causing incorrect results when called multiple times in window queries.
+###########
+
+# Test percentile_cont window frame behavior (fix for issue #19612)
+statement ok
+CREATE TABLE percentile_window_test (

Review Comment:
   Is this the same as `median_window_test`?
   
   
https://github.com/apache/datafusion/blob/aee5cd9f3517b2ac9536fd4eb254f1e1349711df/datafusion/sqllogictest/test_files/aggregate.slt#L1049-L1068
   
   Could we just move these tests up to reuse that table so we don't have 
unnecessary duplication.



##########
datafusion/functions-aggregate/src/percentile_cont.rs:
##########
@@ -427,14 +428,55 @@ impl<T: ArrowNumericType + Debug> Accumulator for 
PercentileContAccumulator<T> {
     }
 
     fn evaluate(&mut self) -> Result<ScalarValue> {
-        let d = std::mem::take(&mut self.all_values);
-        let value = calculate_percentile::<T>(d, self.percentile);
+        let value = calculate_percentile::<T>(&mut self.all_values, 
self.percentile);
         ScalarValue::new_primitive::<T>(value, &T::DATA_TYPE)
     }
 
     fn size(&self) -> usize {
         size_of_val(self) + self.all_values.capacity() * size_of::<T::Native>()
     }
+
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        // Cast to target type if needed (e.g., integer to Float64)
+        let values = if values[0].data_type() != &T::DATA_TYPE {
+            arrow::compute::cast(&values[0], &T::DATA_TYPE)?
+        } else {
+            Arc::clone(&values[0])
+        };

Review Comment:
   We can remove this now that https://github.com/apache/datafusion/pull/19611 
fixed the need to do this



##########
datafusion/functions-aggregate/src/percentile_cont.rs:
##########
@@ -427,14 +428,55 @@ impl<T: ArrowNumericType + Debug> Accumulator for 
PercentileContAccumulator<T> {
     }
 
     fn evaluate(&mut self) -> Result<ScalarValue> {
-        let d = std::mem::take(&mut self.all_values);
-        let value = calculate_percentile::<T>(d, self.percentile);
+        let value = calculate_percentile::<T>(&mut self.all_values, 
self.percentile);
         ScalarValue::new_primitive::<T>(value, &T::DATA_TYPE)
     }
 
     fn size(&self) -> usize {
         size_of_val(self) + self.all_values.capacity() * size_of::<T::Native>()
     }
+
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        // Cast to target type if needed (e.g., integer to Float64)
+        let values = if values[0].data_type() != &T::DATA_TYPE {
+            arrow::compute::cast(&values[0], &T::DATA_TYPE)?
+        } else {
+            Arc::clone(&values[0])
+        };
+
+        let mut to_remove: HashMap<ScalarValue, usize> = HashMap::new();
+        for i in 0..values.len() {
+            let v = ScalarValue::try_from_array(&values, i)?;
+            if !v.is_null() {
+                *to_remove.entry(v).or_default() += 1;
+            }
+        }
+
+        let mut i = 0;
+        while i < self.all_values.len() {
+            let k =
+                ScalarValue::new_primitive::<T>(Some(self.all_values[i]), 
&T::DATA_TYPE)?;
+            if let Some(count) = to_remove.get_mut(&k)
+                && *count > 0
+            {
+                self.all_values.swap_remove(i);
+                *count -= 1;
+                if *count == 0 {
+                    to_remove.remove(&k);
+                    if to_remove.is_empty() {
+                        break;
+                    }
+                }

Review Comment:
   Just for my own reference (and anyone else reviewing), this is the same code 
as from median:
   
   - https://github.com/apache/datafusion/pull/19278



##########
docs/source/library-user-guide/functions/adding-udfs.md:
##########
@@ -1350,6 +1350,71 @@ async fn main() -> Result<()> {
 [`create_udaf`]: 
https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html
 [`advanced_udaf.rs`]: 
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs
 
+### Window Frame Compatible Accumulators

Review Comment:
   Thanks for adding this documentation, however I was thinking its better to 
have it closer to the source; i.e. fix the docstring of `evaluate` itself to 
capture this subtlety.



##########
datafusion/functions-aggregate/src/string_agg.rs:
##########
@@ -384,14 +384,13 @@ impl Accumulator for SimpleStringAggAccumulator {
     }
 
     fn evaluate(&mut self) -> Result<ScalarValue> {
-        let result = if self.has_value {
-            ScalarValue::LargeUtf8(Some(std::mem::take(&mut 
self.accumulated_string)))
+        if self.has_value {
+            Ok(ScalarValue::LargeUtf8(Some(
+                self.accumulated_string.clone(),

Review Comment:
   I guess this is unavoidable 🙁 
   
   I might need to think on this a bit to see if there are ways around 
requiring this clone 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to