SubhamSinghal commented on code in PR #22015:
URL: https://github.com/apache/datafusion/pull/22015#discussion_r3206596108
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -1935,4 +1991,293 @@ mod tests {
Ok(())
}
+
+ // ---- retract_batch tests ----
+
+ #[test]
+ fn retract_basic_sliding_window() -> Result<()> {
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ // Simulate ROWS BETWEEN 1 PRECEDING AND CURRENT ROW over [A, B, C, D]
+ // Row 1: frame = [A]
+ acc.update_batch(&[data(["A"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A"]);
+
+ // Row 2: frame = [A, B]
+ acc.update_batch(&[data(["B"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+ // Row 3: frame = [B, C] — A leaves
+ acc.update_batch(&[data(["C"])])?;
+ acc.retract_batch(&[data(["A"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C"]);
+
+ // Row 4: frame = [C, D] — B leaves
+ acc.update_batch(&[data(["D"])])?;
+ acc.retract_batch(&[data(["B"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C", "D"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_multi_element_across_arrays() -> Result<()> {
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ // First batch: 3 elements
+ acc.update_batch(&[data(["A", "B", "C"])])?;
+ // Second batch: 1 element
+ acc.update_batch(&[data(["D"])])?;
+
+ assert_eq!(
+ print_nulls(str_arr(acc.evaluate()?)?),
+ vec!["A", "B", "C", "D"]
+ );
+
+ // Partial retract from front array: A leaves
+ acc.retract_batch(&[data(["A"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C",
"D"]);
+
+ // Retract spanning two arrays: B, C (rest of first array) + D (second
array)
+ acc.retract_batch(&[data(["B", "C", "D"])])?;
+ let result = acc.evaluate()?;
+ assert!(
+ matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+ "expected null list after full retract, got {result:?}"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_with_nulls_preserved() -> Result<()> {
+ // ignore_nulls = false: NULLs are stored and counted for retract
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
+ assert_eq!(
+ print_nulls(str_arr(acc.evaluate()?)?),
+ vec!["A", "NULL", "C"]
+ );
+
+ // Retract 2 elements: A and NULL both leave
+ acc.retract_batch(&[data([Some("A"), None])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_with_ignore_nulls() -> Result<()> {
+ // ignore_nulls = true: NULLs are NOT stored by update_batch,
+ // so retract must only count non-null values
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+
+ // update_batch with [A, NULL, C] → stores only [A, C] (NULL filtered)
+ acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "C"]);
+
+ // retract_batch receives the original values including NULL: [A, NULL]
+ // But only 1 non-null value (A) should be retracted
+ acc.retract_batch(&[data([Some("A"), None])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
+
+ // retract_batch with [NULL, C] — only C (1 non-null) retracted
+ acc.retract_batch(&[data([None, Some("C")])])?;
+ let result = acc.evaluate()?;
+ assert!(
+ matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+ "expected null list after full retract, got {result:?}"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_ignore_nulls_all_nulls_batch() -> Result<()> {
+ // When ignore_nulls = true and retract batch is all NULLs, nothing is
retracted
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+
+ acc.update_batch(&[data([Some("A"), Some("B")])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+ // Retract batch of all NULLs: to_retract = 0, nothing changes
+ acc.retract_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_empty_accumulator() -> Result<()> {
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ // Retract on empty accumulator should be a no-op
+ acc.retract_batch(&[data(["A"])])?;
+ let result = acc.evaluate()?;
+ assert!(
+ matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+ "expected null list for empty accumulator, got {result:?}"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_front_offset_partial_consume() -> Result<()> {
+ // Reproduces the RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING scenario:
+ // ts: 1, 2, 3, 4, 100
+ //
+ // Row 1 (ts=1): update [A,B,C] (3 elements, ts in [-1,3])
+ // Row 2 (ts=2): update [D] (ts=4 enters)
+ // Row 3 (ts=3): no change (same frame [0..4))
+ // Row 4 (ts=4): retract [A] (ts=1 leaves, partial consume)
+ // Row 5 (ts=100): retract [B,C,D] (3-element retract spanning arrays)
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ // Row 1: update_batch(["A","B","C"])
+ acc.update_batch(&[data(["A", "B", "C"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B",
"C"]);
+
+ // Row 2: update_batch(["D"])
+ acc.update_batch(&[data(["D"])])?;
+ assert_eq!(
+ print_nulls(str_arr(acc.evaluate()?)?),
+ vec!["A", "B", "C", "D"]
+ );
+
+ // Row 4: retract_batch(["A"]) — partial consume, front_offset = 1
+ acc.retract_batch(&[data(["A"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C",
"D"]);
+
+ // Row 5: update_batch(["E"]), then retract_batch(["B","C","D"])
+ // retract spans: ["A","B","C"] (offset=1, 2 remaining) + ["D"] (1
element)
+ acc.update_batch(&[data(["E"])])?;
+ acc.retract_batch(&[data(["B", "C", "D"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["E"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_update_after_full_drain() -> Result<()> {
+ // Verify accumulator works correctly after being fully drained
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ acc.update_batch(&[data(["A", "B"])])?;
+ acc.retract_batch(&[data(["A", "B"])])?;
+
+ // Accumulator is empty now
+ let result = acc.evaluate()?;
+ assert!(
+ matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+ "expected null list, got {result:?}"
+ );
+
+ // New values should work normally after drain
+ acc.update_batch(&[data(["X", "Y"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["X", "Y"]);
+
+ acc.retract_batch(&[data(["X"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["Y"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_supports_retract_batch() -> Result<()> {
+ let acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+ assert!(acc.supports_retract_batch());
+
+ let acc_ignore = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+ assert!(acc_ignore.supports_retract_batch());
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_ignore_nulls_logical_vs_physical() -> Result<()> {
+ // Regression test: DictionaryArray where logical nulls differ from
physical nulls.
+ // Indices are all valid (physical null_count = 0) but some point to
null
+ // dictionary values (logical_null_count > 0).
+ use arrow::array::StringDictionaryBuilder;
+
+ let dict_type =
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8));
+ let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?;
+
+ // Build a DictionaryArray: ["hello", NULL, "world", NULL]
+ // All indices valid → physical null_count = 0
+ // Indices 1,3 point to null → logical_null_count = 2
+ let mut builder =
StringDictionaryBuilder::<arrow::datatypes::Int32Type>::new();
+ builder.append_value("hello");
+ builder.append_null();
+ builder.append_value("world");
+ builder.append_null();
+ let dict_array: ArrayRef = Arc::new(builder.finish());
+
+ assert_eq!(dict_array.null_count(), 2);
Review Comment:
Added assertion for `null_count() != logical_null_count()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]