This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-22015-89ac320dfdfcda012518ce783f918450ea36b127
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 7e439a254ec4fa8834de361f513081f931337408
Author: Subham Singhal <[email protected]>
AuthorDate: Tue May 12 08:09:23 2026 +0530

    feat: implement retract_batch for array_agg sliding window support (#22015)
    
    ## Which issue does this PR close?
    Closes [#21957](https://github.com/apache/datafusion/issues/21957).
    ## Rationale for this change
    
    ## What changes are included in this PR?
    
    ## Are these changes tested?
    
    Yes. Using UT
    ## Are there any user-facing changes?
    Yes. `array_agg` now works with bounded/sliding window frames. Queries
    that previously errored now succeed:
    
    ```sql
    SELECT array_agg(x) OVER (ORDER BY t ROWS BETWEEN 1 PRECEDING AND
    CURRENT ROW) FROM t;
      ```
      No breaking API changes
    
    ---------
    
    Co-authored-by: Subham Singhal <[email protected]>
---
 datafusion/functions-aggregate/src/array_agg.rs    | 387 ++++++++++++++++++++-
 .../test_files/array_agg_sliding_window.slt        | 185 ++++++++++
 2 files changed, 561 insertions(+), 11 deletions(-)

diff --git a/datafusion/functions-aggregate/src/array_agg.rs 
b/datafusion/functions-aggregate/src/array_agg.rs
index 861d7712ba..4365c436a5 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -256,18 +256,22 @@ impl AggregateUDFImpl for ArrayAgg {
 
 #[derive(Debug)]
 pub struct ArrayAggAccumulator {
-    values: Vec<ArrayRef>,
+    values: VecDeque<ArrayRef>,
     datatype: DataType,
     ignore_nulls: bool,
+    /// Number of elements already consumed (retracted) from the front array.
+    /// Used by sliding window frames to avoid copying on partial retract.
+    front_offset: usize,
 }
 
 impl ArrayAggAccumulator {
     /// new array_agg accumulator based on given item data type
     pub fn try_new(datatype: &DataType, ignore_nulls: bool) -> Result<Self> {
         Ok(Self {
-            values: vec![],
+            values: VecDeque::new(),
             datatype: datatype.clone(),
             ignore_nulls,
+            front_offset: 0,
         })
     }
 
@@ -356,7 +360,7 @@ impl Accumulator for ArrayAggAccumulator {
         };
 
         if !val.is_empty() {
-            self.values.push(val)
+            self.values.push_back(val)
         }
 
         Ok(())
@@ -376,12 +380,12 @@ impl Accumulator for ArrayAggAccumulator {
             Some(values) => {
                 // Make sure we don't insert empty lists
                 if !values.is_empty() {
-                    self.values.push(values);
+                    self.values.push_back(values);
                 }
             }
             None => {
                 for arr in list_arr.iter().flatten() {
-                    self.values.push(arr);
+                    self.values.push_back(arr);
                 }
             }
         }
@@ -394,19 +398,71 @@ impl Accumulator for ArrayAggAccumulator {
     }
 
     fn evaluate(&mut self) -> Result<ScalarValue> {
-        // Transform Vec<ListArr> to ListArr
-        let element_arrays: Vec<&dyn Array> =
-            self.values.iter().map(|a| a.as_ref()).collect();
+        if self.values.is_empty() {
+            return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 
1));
+        }
+
+        let element_arrays: Vec<ArrayRef> = self
+            .values
+            .iter()
+            .enumerate()
+            .map(|(i, a)| {
+                if i == 0 && self.front_offset > 0 {
+                    a.slice(self.front_offset, a.len() - self.front_offset)
+                } else {
+                    Arc::clone(a)
+                }
+            })
+            .collect();
 
-        if element_arrays.is_empty() {
+        let element_refs: Vec<&dyn Array> =
+            element_arrays.iter().map(|a| a.as_ref()).collect();
+
+        if element_refs.iter().all(|a| a.is_empty()) {
             return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 
1));
         }
 
-        let concated_array = arrow::compute::concat(&element_arrays)?;
+        let concated_array = arrow::compute::concat(&element_refs)?;
 
         Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar())
     }
 
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        assert_eq_or_internal_err!(values.len(), 1, "expects single batch");
+
+        let val = &values[0];
+        let mut to_retract = if self.ignore_nulls {
+            val.len() - val.logical_null_count()
+        } else {
+            val.len()
+        };
+
+        while to_retract > 0 {
+            let Some(front) = self.values.front() else {
+                break;
+            };
+            let available = front.len() - self.front_offset;
+            if to_retract >= available {
+                self.values.pop_front();
+                to_retract -= available;
+                self.front_offset = 0;
+            } else {
+                self.front_offset += to_retract;
+                to_retract = 0;
+            }
+        }
+
+        Ok(())
+    }
+
+    fn supports_retract_batch(&self) -> bool {
+        true
+    }
+
     fn size(&self) -> usize {
         size_of_val(self)
             + (size_of::<ArrayRef>() * self.values.capacity())
@@ -1415,7 +1471,7 @@ mod tests {
         acc2.update_batch(&[data(["b", "c", "a"])])?;
         acc1 = merge(acc1, acc2)?;
 
-        assert_eq!(acc1.size(), 266);
+        assert_eq!(acc1.size(), 282);
 
         Ok(())
     }
@@ -1935,4 +1991,313 @@ mod tests {
 
         Ok(())
     }
+
+    // ---- retract_batch tests ----
+
+    #[test]
+    fn retract_basic_sliding_window() -> Result<()> {
+        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+        // Simulate ROWS BETWEEN 1 PRECEDING AND CURRENT ROW over [A, B, C, D]
+        // Row 1: frame = [A]
+        acc.update_batch(&[data(["A"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A"]);
+
+        // Row 2: frame = [A, B]
+        acc.update_batch(&[data(["B"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+        // Row 3: frame = [B, C] — A leaves
+        acc.update_batch(&[data(["C"])])?;
+        acc.retract_batch(&[data(["A"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C"]);
+
+        // Row 4: frame = [C, D] — B leaves
+        acc.update_batch(&[data(["D"])])?;
+        acc.retract_batch(&[data(["B"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C", "D"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_multi_element_across_arrays() -> Result<()> {
+        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+        // First batch: 3 elements
+        acc.update_batch(&[data(["A", "B", "C"])])?;
+        // Second batch: 1 element
+        acc.update_batch(&[data(["D"])])?;
+
+        assert_eq!(
+            print_nulls(str_arr(acc.evaluate()?)?),
+            vec!["A", "B", "C", "D"]
+        );
+
+        // Partial retract from front array: A leaves
+        acc.retract_batch(&[data(["A"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", 
"D"]);
+
+        // Retract spanning two arrays: B, C (rest of first array) + D (second 
array)
+        acc.retract_batch(&[data(["B", "C", "D"])])?;
+        let result = acc.evaluate()?;
+        assert!(
+            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+            "expected null list after full retract, got {result:?}"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_with_nulls_preserved() -> Result<()> {
+        // ignore_nulls = false: NULLs are stored and counted for retract
+        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+        acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
+        assert_eq!(
+            print_nulls(str_arr(acc.evaluate()?)?),
+            vec!["A", "NULL", "C"]
+        );
+
+        // Retract 2 elements: A and NULL both leave
+        acc.retract_batch(&[data([Some("A"), None])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_with_ignore_nulls() -> Result<()> {
+        // ignore_nulls = true: NULLs are NOT stored by update_batch,
+        // so retract must only count non-null values
+        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+
+        // update_batch with [A, NULL, C] → stores only [A, C] (NULL filtered)
+        acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "C"]);
+
+        // retract_batch receives the original values including NULL: [A, NULL]
+        // But only 1 non-null value (A) should be retracted
+        acc.retract_batch(&[data([Some("A"), None])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
+
+        // retract_batch with [NULL, C] — only C (1 non-null) retracted
+        acc.retract_batch(&[data([None, Some("C")])])?;
+        let result = acc.evaluate()?;
+        assert!(
+            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+            "expected null list after full retract, got {result:?}"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_ignore_nulls_all_nulls_batch() -> Result<()> {
+        // When ignore_nulls = true and retract batch is all NULLs, nothing is 
retracted
+        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+
+        acc.update_batch(&[data([Some("A"), Some("B")])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+        // Retract batch of all NULLs: to_retract = 0, nothing changes
+        acc.retract_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_empty_accumulator() -> Result<()> {
+        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+        // Retract on empty accumulator should be a no-op
+        acc.retract_batch(&[data(["A"])])?;
+        let result = acc.evaluate()?;
+        assert!(
+            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+            "expected null list for empty accumulator, got {result:?}"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_front_offset_partial_consume() -> Result<()> {
+        // Reproduces the RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING scenario:
+        //   ts: 1, 2, 3, 4, 100
+        //
+        // Row 1 (ts=1): update [A,B,C] (3 elements, ts in [-1,3])
+        // Row 2 (ts=2): update [D]     (ts=4 enters)
+        // Row 3 (ts=3): no change      (same frame [0..4))
+        // Row 4 (ts=4): retract [A]    (ts=1 leaves, partial consume)
+        // Row 5 (ts=100): retract [B,C,D] (3-element retract spanning arrays)
+        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+        // Row 1: update_batch(["A","B","C"])
+        acc.update_batch(&[data(["A", "B", "C"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B", 
"C"]);
+
+        // Row 2: update_batch(["D"])
+        acc.update_batch(&[data(["D"])])?;
+        assert_eq!(
+            print_nulls(str_arr(acc.evaluate()?)?),
+            vec!["A", "B", "C", "D"]
+        );
+
+        // Row 4: retract_batch(["A"]) — partial consume, front_offset = 1
+        acc.retract_batch(&[data(["A"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", 
"D"]);
+
+        // Row 5: update_batch(["E"]), then retract_batch(["B","C","D"])
+        // retract spans: ["A","B","C"] (offset=1, 2 remaining) + ["D"] (1 
element)
+        acc.update_batch(&[data(["E"])])?;
+        acc.retract_batch(&[data(["B", "C", "D"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["E"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_update_after_full_drain() -> Result<()> {
+        // Verify accumulator works correctly after being fully drained
+        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+        acc.update_batch(&[data(["A", "B"])])?;
+        acc.retract_batch(&[data(["A", "B"])])?;
+
+        // Accumulator is empty now
+        let result = acc.evaluate()?;
+        assert!(
+            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+            "expected null list, got {result:?}"
+        );
+
+        // New values should work normally after drain
+        acc.update_batch(&[data(["X", "Y"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["X", "Y"]);
+
+        acc.retract_batch(&[data(["X"])])?;
+        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["Y"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_supports_retract_batch() -> Result<()> {
+        let acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+        assert!(acc.supports_retract_batch());
+
+        let acc_ignore = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+        assert!(acc_ignore.supports_retract_batch());
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_ignore_nulls_logical_vs_physical() -> Result<()> {
+        // Regression test: DictionaryArray where logical nulls differ from 
physical nulls.
+        // Manually construct a DictionaryArray where all indices are valid
+        // (physical null_count = 0) but some point to null dictionary values
+        // (logical_null_count > 0).
+        use arrow::array::{DictionaryArray, Int32Array, StringArray};
+
+        let dict_type =
+            DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+        let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?;
+
+        // Dictionary values: ["hello", NULL, "world"]
+        // Keys: [0, 1, 2, 1] — all valid, but keys 1 and 3 point to null value
+        let values = StringArray::from(vec![Some("hello"), None, 
Some("world")]);
+        let keys = Int32Array::from(vec![0, 1, 2, 1]);
+        let dict_array: ArrayRef = Arc::new(DictionaryArray::new(keys, 
Arc::new(values)));
+
+        // Confirm the divergence this test exists to exercise
+        assert_eq!(
+            dict_array.null_count(),
+            0,
+            "physical nulls: none in keys bitmap"
+        );
+        assert_eq!(
+            dict_array.logical_null_count(),
+            2,
+            "logical nulls: keys pointing to null values"
+        );
+
+        // update_batch uses logical_nulls() → stores only ["hello", "world"]
+        acc.update_batch(std::slice::from_ref(&dict_array))?;
+
+        // Verify 2 elements stored
+        let result = acc.evaluate()?;
+        match &result {
+            ScalarValue::List(arr) => {
+                let values = arr.value(0);
+                assert_eq!(values.len(), 2);
+            }
+            other => panic!("expected List, got {other:?}"),
+        }
+
+        // retract_batch with same array: should retract 2 (logical 
non-nulls), not 4 (len) or 0 (physical non-nulls would be len-0=4)
+        acc.retract_batch(&[dict_array])?;
+        let result = acc.evaluate()?;
+        assert!(
+            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+            "expected null list after full retract, got {result:?}"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn retract_ignore_nulls_dict_partial() -> Result<()> {
+        // Partial retraction with DictionaryArray where logical != physical 
nulls.
+        // Manually construct so keys are all valid but some point to null 
values.
+        use arrow::array::{DictionaryArray, Int32Array, StringArray};
+
+        let dict_type =
+            DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+        let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?;
+
+        // update with ["A", "B", "C"] (no nulls)
+        let values = StringArray::from(vec!["A", "B", "C"]);
+        let keys = Int32Array::from(vec![0, 1, 2]);
+        let update_array: ArrayRef =
+            Arc::new(DictionaryArray::new(keys, Arc::new(values)));
+        acc.update_batch(&[update_array])?;
+
+        // retract with dict ["A", NULL, NULL]:
+        //   keys [0, 1, 1] all valid → physical null_count = 0
+        //   keys 1,2 point to null value → logical_null_count = 2
+        //   non-null count = 3 - 2 = 1 → retract 1 element
+        let values = StringArray::from(vec![Some("A"), None]);
+        let keys = Int32Array::from(vec![0, 1, 1]);
+        let retract_array: ArrayRef =
+            Arc::new(DictionaryArray::new(keys, Arc::new(values)));
+
+        assert_eq!(
+            retract_array.null_count(),
+            0,
+            "physical nulls: none in keys bitmap"
+        );
+        assert_eq!(
+            retract_array.logical_null_count(),
+            2,
+            "logical nulls: keys pointing to null values"
+        );
+
+        acc.retract_batch(&[retract_array])?;
+
+        // Should have retracted only 1 element, leaving ["B", "C"]
+        let result = acc.evaluate()?;
+        match &result {
+            ScalarValue::List(arr) => {
+                let values = arr.value(0);
+                assert_eq!(values.len(), 2);
+            }
+            other => panic!("expected List with 2 elements, got {other:?}"),
+        }
+
+        Ok(())
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt 
b/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt
new file mode 100644
index 0000000000..78d48513a6
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt
@@ -0,0 +1,185 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#######
+# Tests for array_agg with sliding (bounded) window frames.
+# Validates the retract_batch implementation on ArrayAggAccumulator.
+#######
+
+# Setup test data
+statement ok
+CREATE TABLE t(ts INT, val TEXT) AS VALUES
+  (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E');
+
+# Basic ROWS sliding window
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT 
ROW)
+FROM t;
+----
+[A]
+[A, B]
+[B, C]
+[C, D]
+[D, E]
+
+# Wider ROWS frame
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 2 PRECEDING AND CURRENT 
ROW)
+FROM t;
+----
+[A]
+[A, B]
+[A, B, C]
+[B, C, D]
+[C, D, E]
+
+# ROWS with FOLLOWING
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING)
+FROM t;
+----
+[A, B]
+[A, B, C]
+[B, C, D]
+[C, D, E]
+[D, E]
+
+# Regression: unbounded frame still works
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW)
+FROM t;
+----
+[A]
+[A, B]
+[A, B, C]
+[A, B, C, D]
+[A, B, C, D, E]
+
+# Setup data with NULLs
+statement ok
+CREATE TABLE t_nulls(ts INT, val TEXT) AS VALUES
+  (1, 'A'), (2, NULL), (3, 'C'), (4, NULL), (5, 'E');
+
+# Sliding window with NULLs (nulls preserved)
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT 
ROW)
+FROM t_nulls;
+----
+[A]
+[A, NULL]
+[NULL, C]
+[C, NULL]
+[NULL, E]
+
+# Setup data with value gaps for RANGE frame
+statement ok
+CREATE TABLE t_range(ts INT, val TEXT) AS VALUES
+  (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (100, 'E');
+
+# RANGE frame with value gap causes multi-element retract
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts RANGE BETWEEN 2 PRECEDING AND 2 
FOLLOWING)
+FROM t_range;
+----
+[A, B, C]
+[A, B, C, D]
+[A, B, C, D]
+[B, C, D]
+[E]
+
+# Single-row frame
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN CURRENT ROW AND CURRENT 
ROW)
+FROM t;
+----
+[A]
+[B]
+[C]
+[D]
+[E]
+
+# Integer values in sliding window
+statement ok
+CREATE TABLE t_int(ts INT, val INT) AS VALUES
+  (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);
+
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT 
ROW)
+FROM t_int;
+----
+[10]
+[10, 20]
+[20, 30]
+[30, 40]
+[40, 50]
+
+# Setup data with duplicate sort keys for GROUPS frame
+statement ok
+CREATE TABLE t_groups(ts INT, val TEXT) AS VALUES
+  (1, 'A'), (1, 'B'), (2, 'C'), (2, 'D'), (3, 'E');
+
+# GROUPS frame: each "group" = rows with same ORDER BY value
+# Group 0: ts=1 (A,B), Group 1: ts=2 (C,D), Group 2: ts=3 (E)
+# Frame: 1 PRECEDING group + CURRENT group
+query ?
+SELECT array_agg(val) OVER (ORDER BY ts GROUPS BETWEEN 1 PRECEDING AND CURRENT 
ROW)
+FROM t_groups;
+----
+[A, B]
+[A, B]
+[A, B, C, D]
+[A, B, C, D]
+[C, D, E]
+
+# IGNORE NULLS: nulls are filtered out before storage and retract
+# t_nulls has: (1,'A'), (2,NULL), (3,'C'), (4,NULL), (5,'E')
+query ?
+SELECT array_agg(val) IGNORE NULLS OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING 
AND CURRENT ROW)
+FROM t_nulls;
+----
+[A]
+[A]
+[C]
+[C]
+[E]
+
+# IGNORE NULLS with wider frame
+query ?
+SELECT array_agg(val) IGNORE NULLS OVER (ORDER BY ts ROWS BETWEEN 2 PRECEDING 
AND CURRENT ROW)
+FROM t_nulls;
+----
+[A]
+[A]
+[A, C]
+[C]
+[C, E]
+
+# Cleanup
+statement ok
+DROP TABLE t;
+
+statement ok
+DROP TABLE t_nulls;
+
+statement ok
+DROP TABLE t_range;
+
+statement ok
+DROP TABLE t_int;
+
+statement ok
+DROP TABLE t_groups;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to