This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 356a307c37 Add support for ignore nulls for LEAD, LAG in WindowAggExec 
(#9498)
356a307c37 is described below

commit 356a307c371c43d3a2c61a7aa49eb83246e8ca92
Author: Lordworms <[email protected]>
AuthorDate: Sat Mar 9 13:48:04 2024 -0600

    Add support for ignore nulls for LEAD, LAG in WindowAggExec (#9498)
    
    * adding prototype
    
    a
    
    * fix clippy
    
    * modify test
    
    * fix clippy
    
    * fix:clippy
    
    * optimize code
    
    * change tests
---
 datafusion/physical-expr/src/window/lead_lag.rs | 72 +++++++++++++++++++++----
 datafusion/sqllogictest/test_files/window.slt   | 46 ++++++++++++----
 2 files changed, 99 insertions(+), 19 deletions(-)

diff --git a/datafusion/physical-expr/src/window/lead_lag.rs 
b/datafusion/physical-expr/src/window/lead_lag.rs
index da9410cdbe..2b77e57161 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -17,15 +17,12 @@
 
 //! Defines physical expression for `lead` and `lag` that can evaluated
 //! at runtime during query execution
-
 use crate::window::BuiltInWindowFunctionExpr;
 use crate::PhysicalExpr;
 use arrow::array::ArrayRef;
 use arrow::datatypes::{DataType, Field};
 use arrow_array::Array;
-use datafusion_common::{
-    arrow_datafusion_err, exec_datafusion_err, DataFusionError, Result, 
ScalarValue,
-};
+use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, 
ScalarValue};
 use datafusion_expr::PartitionEvaluator;
 use std::any::Any;
 use std::cmp::min;
@@ -151,6 +148,57 @@ impl WindowShiftEvaluator {
     }
 }
 
+// implement ignore null for evaluate_all
+fn evaluate_all_with_ignore_null(
+    array: &ArrayRef,
+    offset: i64,
+    default_value: &ScalarValue,
+    is_lag: bool,
+) -> Result<ArrayRef, DataFusionError> {
+    let valid_indices: Vec<usize> =
+        array.nulls().unwrap().valid_indices().collect::<Vec<_>>();
+    let direction = !is_lag;
+    let new_array_results: Result<Vec<_>, DataFusionError> = (0..array.len())
+        .map(|id| {
+            let result_index = match valid_indices.binary_search(&id) {
+                Ok(pos) => if direction {
+                    pos.checked_add(offset as usize)
+                } else {
+                    pos.checked_sub(offset.unsigned_abs() as usize)
+                }
+                .and_then(|new_pos| {
+                    if new_pos < valid_indices.len() {
+                        Some(valid_indices[new_pos])
+                    } else {
+                        None
+                    }
+                }),
+                Err(pos) => if direction {
+                    pos.checked_add(offset as usize)
+                } else if pos > 0 {
+                    pos.checked_sub(offset.unsigned_abs() as usize)
+                } else {
+                    None
+                }
+                .and_then(|new_pos| {
+                    if new_pos < valid_indices.len() {
+                        Some(valid_indices[new_pos])
+                    } else {
+                        None
+                    }
+                }),
+            };
+
+            match result_index {
+                Some(index) => ScalarValue::try_from_array(array, index),
+                None => Ok(default_value.clone()),
+            }
+        })
+        .collect();
+
+    let new_array = new_array_results?;
+    ScalarValue::iter_to_array(new_array)
+}
 // TODO: change the original arrow::compute::kernels::window::shift impl to 
support an optional default value
 fn shift_with_default_value(
     array: &ArrayRef,
@@ -326,14 +374,18 @@ impl PartitionEvaluator for WindowShiftEvaluator {
         values: &[ArrayRef],
         _num_rows: usize,
     ) -> Result<ArrayRef> {
-        if self.ignore_nulls {
-            return Err(exec_datafusion_err!(
-                "IGNORE NULLS mode for LAG and LEAD is not supported for 
WindowAggExec"
-            ));
-        }
         // LEAD, LAG window functions take single column, values will have 
size 1
         let value = &values[0];
-        shift_with_default_value(value, self.shift_offset, &self.default_value)
+        if !self.ignore_nulls {
+            shift_with_default_value(value, self.shift_offset, 
&self.default_value)
+        } else {
+            evaluate_all_with_ignore_null(
+                value,
+                self.shift_offset,
+                &self.default_value,
+                self.is_lag(),
+            )
+        }
     }
 
     fn supports_bounded_execution(&self) -> bool {
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index c7241cae30..3525f59142 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4133,18 +4133,35 @@ x x x x x x
 x x x NULL NULL NULL
 b b b b b b
 
-# LAG window function IGNORE/RESPECT NULLS support with ascending order and 
nondefault offset
-query TTTT
+# LAG window function IGNORE/RESPECT NULLS support with descending order and 
default offset 1 trigger evaluate_all
+query TTTTTTI
+select lag(a) ignore nulls over (order by id desc) as x,
+       lag(a, 1, null) ignore nulls over (order by id desc) as x1,
+       lag(a, 1, 'def') ignore nulls over (order by id desc) as x2,
+       lag(a) respect nulls over (order by id desc) as x3,
+       lag(a, 1, null) respect nulls over (order by id desc) as x4,
+       lag(a, 1, 'def') respect nulls over (order by id desc) as x5,
+       sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING) as sum_id
+from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, 
null union all select 4 id, 'x')
+----
+NULL NULL def NULL NULL def 10
+x x x x x x 10
+x x x NULL NULL NULL 10
+b b b b b b 10
+
+# LAG window function IGNORE/RESPECT NULLS support with ascending order and 
nondefault offset trigger evaluate_all
+query TTTTI
 select lag(a, 2, null) ignore nulls over (order by id) as x1,
        lag(a, 2, 'def') ignore nulls over (order by id) as x2,
        lag(a, 2, null) respect nulls over (order by id) as x4,
-       lag(a, 2, 'def') respect nulls over (order by id) as x5
+       lag(a, 2, 'def') respect nulls over (order by id) as x5,
+       sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING) as sum_id
 from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, 
null union all select 4 id, 'x')
 ----
-NULL def NULL def
-NULL def NULL def
-NULL def NULL NULL
-NULL def b b
+NULL def b b 10
+NULL def NULL NULL 10
+NULL def NULL def 10
+NULL def NULL def 10
 
 # LAG window function IGNORE/RESPECT NULLS support with descending order and 
nondefault offset
 query TTTT
@@ -4161,13 +4178,18 @@ x x NULL NULL
 
 # LAG window function IGNORE/RESPECT NULLS support with descending order and 
nondefault offset.
 # To trigger WindowAggExec, we added a sum window function with all of the 
ranges.
-statement error Execution error: IGNORE NULLS mode for LAG and LEAD is not 
supported for WindowAggExec
+query TTTTI
 select lag(a, 2, null) ignore nulls over (order by id desc) as x1,
        lag(a, 2, 'def') ignore nulls over (order by id desc) as x2,
        lag(a, 2, null) respect nulls over (order by id desc) as x4,
        lag(a, 2, 'def') respect nulls over (order by id desc) as x5,
        sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING) as sum_id
 from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, 
null union all select 4 id, 'x')
+----
+NULL def NULL def 10
+NULL def NULL def 10
+NULL def x x 10
+x x NULL NULL 10
 
 # LEAD window function IGNORE/RESPECT NULLS support with ascending order and 
default offset 1
 query TTTTTT
@@ -4227,13 +4249,19 @@ NULL def NULL def
 
 # LEAD window function IGNORE/RESPECT NULLS support with descending order and 
nondefault offset.
 # To trigger WindowAggExec, we added a sum window function with all of the 
ranges.
-statement error Execution error: IGNORE NULLS mode for LAG and LEAD is not 
supported for WindowAggExec
+query TTTTI
 select lead(a, 2, null) ignore nulls over (order by id desc) as x1,
        lead(a, 2, 'def') ignore nulls over (order by id desc) as x2,
        lead(a, 2, null) respect nulls over (order by id desc) as x4,
        lead(a, 2, 'def') respect nulls over (order by id desc) as x5,
        sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING) as sum_id
 from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, 
null union all select 4 id, 'x')
+----
+NULL def b b 10
+NULL def NULL NULL 10
+NULL def NULL def 10
+NULL def NULL def 10
+
 
 statement ok
 set datafusion.execution.batch_size = 1000;

Reply via email to