This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 09aea09b6 Refactor Builtin Window Function Implementation (#4441)
09aea09b6 is described below

commit 09aea09b6e7bd2950c0f57296ccb1a1995570057
Author: Mustafa akur <[email protected]>
AuthorDate: Thu Dec 1 18:43:57 2022 +0300

    Refactor Builtin Window Function Implementation (#4441)
---
 datafusion/physical-expr/src/window/aggregate.rs   | 24 +++--------
 datafusion/physical-expr/src/window/built_in.rs    | 27 +++++-------
 .../src/window/built_in_window_function_expr.rs    | 16 ++++++--
 datafusion/physical-expr/src/window/cume_dist.rs   | 36 ++++------------
 datafusion/physical-expr/src/window/lead_lag.rs    | 26 +++++-------
 datafusion/physical-expr/src/window/nth_value.rs   | 36 ++++++----------
 .../src/window/partition_evaluator.rs              | 24 +++++++++--
 datafusion/physical-expr/src/window/rank.rs        | 48 ++++------------------
 datafusion/physical-expr/src/window/row_number.rs  | 22 ++++++----
 datafusion/physical-expr/src/window/window_expr.rs | 27 +++++++++---
 10 files changed, 122 insertions(+), 164 deletions(-)

diff --git a/datafusion/physical-expr/src/window/aggregate.rs 
b/datafusion/physical-expr/src/window/aggregate.rs
index 69b0812c1..17a19b6d7 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -87,15 +87,9 @@ impl WindowExpr for AggregateWindowExpr {
         let partition_columns = self.partition_columns(batch)?;
         let partition_points =
             self.evaluate_partition_points(batch.num_rows(), 
&partition_columns)?;
-        let values = self.evaluate_args(batch)?;
-
         let sort_options: Vec<SortOptions> =
             self.order_by.iter().map(|o| o.options).collect();
-        let columns = self.sort_columns(batch)?;
-        let order_columns: Vec<&ArrayRef> = columns.iter().map(|s| 
&s.values).collect();
-        // Sort values, this will make the same partitions consecutive. Also, 
within the partition
-        // range, values will be sorted.
-        let order_bys = &order_columns[self.partition_by.len()..];
+        let (_, order_bys) = self.get_values_orderbys(batch)?;
         let window_frame = if !order_bys.is_empty() && 
self.window_frame.is_none() {
             // OVER (ORDER BY a) case
             // We create an implicit window for ORDER BY.
@@ -107,14 +101,8 @@ impl WindowExpr for AggregateWindowExpr {
         for partition_range in &partition_points {
             let mut accumulator = self.aggregate.create_accumulator()?;
             let length = partition_range.end - partition_range.start;
-            let slice_order_bys = order_bys
-                .iter()
-                .map(|v| v.slice(partition_range.start, length))
-                .collect::<Vec<_>>();
-            let value_slice = values
-                .iter()
-                .map(|v| v.slice(partition_range.start, length))
-                .collect::<Vec<_>>();
+            let (values, order_bys) =
+                self.get_values_orderbys(&batch.slice(partition_range.start, 
length))?;
 
             let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
             let mut last_range: (usize, usize) = (0, 0);
@@ -123,7 +111,7 @@ impl WindowExpr for AggregateWindowExpr {
             // First, cur_range is calculated, then it is compared with 
last_range.
             for i in 0..length {
                 let cur_range = window_frame_ctx.calculate_range(
-                    &slice_order_bys,
+                    &order_bys,
                     &sort_options,
                     length,
                     i,
@@ -135,7 +123,7 @@ impl WindowExpr for AggregateWindowExpr {
                     // Accumulate any new rows that have entered the window:
                     let update_bound = cur_range.1 - last_range.1;
                     if update_bound > 0 {
-                        let update: Vec<ArrayRef> = value_slice
+                        let update: Vec<ArrayRef> = values
                             .iter()
                             .map(|v| v.slice(last_range.1, update_bound))
                             .collect();
@@ -144,7 +132,7 @@ impl WindowExpr for AggregateWindowExpr {
                     // Remove rows that have now left the window:
                     let retract_bound = cur_range.0 - last_range.0;
                     if retract_bound > 0 {
-                        let retract: Vec<ArrayRef> = value_slice
+                        let retract: Vec<ArrayRef> = values
                             .iter()
                             .map(|v| v.slice(last_range.0, retract_bound))
                             .collect();
diff --git a/datafusion/physical-expr/src/window/built_in.rs 
b/datafusion/physical-expr/src/window/built_in.rs
index da551d534..adf8d5b34 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -21,7 +21,6 @@ use super::window_frame_state::WindowFrameContext;
 use super::BuiltInWindowFunctionExpr;
 use super::WindowExpr;
 use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
-use arrow::array::Array;
 use arrow::compute::{concat, SortOptions};
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
@@ -85,7 +84,7 @@ impl WindowExpr for BuiltInWindowExpr {
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        let evaluator = self.expr.create_evaluator(batch)?;
+        let evaluator = self.expr.create_evaluator()?;
         let num_rows = batch.num_rows();
         let partition_columns = self.partition_columns(batch)?;
         let partition_points =
@@ -94,12 +93,7 @@ impl WindowExpr for BuiltInWindowExpr {
         let results = if evaluator.uses_window_frame() {
             let sort_options: Vec<SortOptions> =
                 self.order_by.iter().map(|o| o.options).collect();
-            let columns = self.sort_columns(batch)?;
-            let order_columns: Vec<&ArrayRef> =
-                columns.iter().map(|s| &s.values).collect();
-            // Sort values, this will make the same partitions consecutive. 
Also, within the partition
-            // range, values will be sorted.
-            let order_bys = &order_columns[self.partition_by.len()..];
+            let (_, order_bys) = self.get_values_orderbys(batch)?;
             let window_frame = if !order_bys.is_empty() && 
self.window_frame.is_none() {
                 // OVER (ORDER BY a) case
                 // We create an implicit window for ORDER BY.
@@ -110,24 +104,22 @@ impl WindowExpr for BuiltInWindowExpr {
             let mut row_wise_results = vec![];
             for partition_range in &partition_points {
                 let length = partition_range.end - partition_range.start;
-                let slice_order_bys = order_bys
-                    .iter()
-                    .map(|v| v.slice(partition_range.start, length))
-                    .collect::<Vec<_>>();
+                let (values, order_bys) = self
+                    .get_values_orderbys(&batch.slice(partition_range.start, 
length))?;
                 let mut window_frame_ctx = 
WindowFrameContext::new(&window_frame);
                 // We iterate on each row to calculate window frame range and 
and window function result
                 for idx in 0..length {
                     let range = window_frame_ctx.calculate_range(
-                        &slice_order_bys,
+                        &order_bys,
                         &sort_options,
                         num_rows,
                         idx,
                     )?;
                     let range = Range {
-                        start: partition_range.start + range.0,
-                        end: partition_range.start + range.1,
+                        start: range.0,
+                        end: range.1,
                     };
-                    let value = evaluator.evaluate_inside_range(range)?;
+                    let value = evaluator.evaluate_inside_range(&values, 
range)?;
                     row_wise_results.push(value.to_array());
                 }
             }
@@ -138,7 +130,8 @@ impl WindowExpr for BuiltInWindowExpr {
                 self.evaluate_partition_points(num_rows, &columns)?;
             evaluator.evaluate_with_rank(partition_points, 
sort_partition_points)?
         } else {
-            evaluator.evaluate(partition_points)?
+            let (values, _) = self.get_values_orderbys(batch)?;
+            evaluator.evaluate(&values, partition_points)?
         };
         let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
         concat(&results).map_err(DataFusionError::ArrowError)
diff --git 
a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs 
b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
index 43e1272bc..7f7a27435 100644
--- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
+++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
@@ -17,6 +17,7 @@
 
 use super::partition_evaluator::PartitionEvaluator;
 use crate::PhysicalExpr;
+use arrow::array::ArrayRef;
 use arrow::datatypes::Field;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::Result;
@@ -45,9 +46,16 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + 
std::fmt::Debug {
         "BuiltInWindowFunctionExpr: default name"
     }
 
+    /// Evaluate window function arguments against the batch and return
+    /// an array ref. Typically, the resulting vector is a single element 
vector.
+    fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
+        self.expressions()
+            .iter()
+            .map(|e| e.evaluate(batch))
+            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .collect()
+    }
+
     /// Create built-in window evaluator with a batch
-    fn create_evaluator(
-        &self,
-        batch: &RecordBatch,
-    ) -> Result<Box<dyn PartitionEvaluator>>;
+    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
 }
diff --git a/datafusion/physical-expr/src/window/cume_dist.rs 
b/datafusion/physical-expr/src/window/cume_dist.rs
index 922788816..4202058a3 100644
--- a/datafusion/physical-expr/src/window/cume_dist.rs
+++ b/datafusion/physical-expr/src/window/cume_dist.rs
@@ -24,7 +24,6 @@ use crate::PhysicalExpr;
 use arrow::array::ArrayRef;
 use arrow::array::Float64Array;
 use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
 use datafusion_common::Result;
 use std::any::Any;
 use std::iter;
@@ -62,10 +61,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
         &self.name
     }
 
-    fn create_evaluator(
-        &self,
-        _batch: &RecordBatch,
-    ) -> Result<Box<dyn PartitionEvaluator>> {
+    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::new(CumeDistEvaluator {}))
     }
 }
@@ -77,12 +73,6 @@ impl PartitionEvaluator for CumeDistEvaluator {
         true
     }
 
-    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> 
{
-        unreachable!(
-            "cume_dist evaluation must be called with 
evaluate_partition_with_rank"
-        )
-    }
-
     fn evaluate_partition_with_rank(
         &self,
         partition: Range<usize>,
@@ -108,22 +98,16 @@ impl PartitionEvaluator for CumeDistEvaluator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow::{array::*, datatypes::*};
     use datafusion_common::cast::as_float64_array;
 
     fn test_i32_result(
         expr: &CumeDist,
-        data: Vec<i32>,
         partition: Range<usize>,
         ranks: Vec<Range<usize>>,
         expected: Vec<f64>,
     ) -> Result<()> {
-        let arr: ArrayRef = Arc::new(Int32Array::from(data));
-        let values = vec![arr];
-        let schema = Schema::new(vec![Field::new("arr", DataType::Int32, 
false)]);
-        let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
         let result = expr
-            .create_evaluator(&batch)?
+            .create_evaluator()?
             .evaluate_with_rank(vec![partition], ranks)?;
         assert_eq!(1, result.len());
         let result = as_float64_array(&result[0])?;
@@ -137,25 +121,19 @@ mod tests {
         let r = cume_dist("arr".into());
 
         let expected = vec![0.0; 0];
-        test_i32_result(&r, vec![], 0..0, vec![], expected)?;
+        test_i32_result(&r, 0..0, vec![], expected)?;
 
         let expected = vec![1.0; 1];
-        test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?;
+        test_i32_result(&r, 0..1, vec![0..1], expected)?;
 
         let expected = vec![1.0; 2];
-        test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?;
+        test_i32_result(&r, 0..2, vec![0..2], expected)?;
 
         let expected = vec![0.5, 0.5, 1.0, 1.0];
-        test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4], 
expected)?;
+        test_i32_result(&r, 0..4, vec![0..2, 2..4], expected)?;
 
         let expected = vec![0.25, 0.5, 0.75, 1.0];
-        test_i32_result(
-            &r,
-            vec![1, 2, 4, 5],
-            0..4,
-            vec![0..1, 1..2, 2..3, 3..4],
-            expected,
-        )?;
+        test_i32_result(&r, 0..4, vec![0..1, 1..2, 2..3, 3..4], expected)?;
 
         Ok(())
     }
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs 
b/datafusion/physical-expr/src/window/lead_lag.rs
index 860df716a..ef4175d13 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -24,7 +24,6 @@ use crate::PhysicalExpr;
 use arrow::array::ArrayRef;
 use arrow::compute::cast;
 use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
 use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use std::any::Any;
@@ -95,19 +94,9 @@ impl BuiltInWindowFunctionExpr for WindowShift {
         &self.name
     }
 
-    fn create_evaluator(
-        &self,
-        batch: &RecordBatch,
-    ) -> Result<Box<dyn PartitionEvaluator>> {
-        let values = self
-            .expressions()
-            .iter()
-            .map(|e| e.evaluate(batch))
-            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
-            .collect::<Result<Vec<_>>>()?;
+    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::new(WindowShiftEvaluator {
             shift_offset: self.shift_offset,
-            values,
             default_value: self.default_value.clone(),
         }))
     }
@@ -115,7 +104,6 @@ impl BuiltInWindowFunctionExpr for WindowShift {
 
 pub(crate) struct WindowShiftEvaluator {
     shift_offset: i64,
-    values: Vec<ArrayRef>,
     default_value: Option<ScalarValue>,
 }
 
@@ -169,8 +157,13 @@ fn shift_with_default_value(
 }
 
 impl PartitionEvaluator for WindowShiftEvaluator {
-    fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
-        let value = &self.values[0];
+    fn evaluate_partition(
+        &self,
+        values: &[ArrayRef],
+        partition: Range<usize>,
+    ) -> Result<ArrayRef> {
+        // LEAD, LAG window functions take single column, values will have 
size 1
+        let value = &values[0];
         let value = value.slice(partition.start, partition.end - 
partition.start);
         shift_with_default_value(&value, self.shift_offset, 
self.default_value.as_ref())
     }
@@ -190,7 +183,8 @@ mod tests {
         let values = vec![arr];
         let schema = Schema::new(vec![Field::new("arr", DataType::Int32, 
false)]);
         let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
-        let result = expr.create_evaluator(&batch)?.evaluate(vec![0..8])?;
+        let values = expr.evaluate_args(&batch)?;
+        let result = expr.create_evaluator()?.evaluate(&values, vec![0..8])?;
         assert_eq!(1, result.len());
         let result = as_int32_array(&result[0])?;
         assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/nth_value.rs 
b/datafusion/physical-expr/src/window/nth_value.rs
index e9988032c..e0afb520e 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -23,7 +23,6 @@ use crate::window::BuiltInWindowFunctionExpr;
 use crate::PhysicalExpr;
 use arrow::array::{Array, ArrayRef};
 use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
 use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use std::any::Any;
@@ -116,27 +115,14 @@ impl BuiltInWindowFunctionExpr for NthValue {
         &self.name
     }
 
-    fn create_evaluator(
-        &self,
-        batch: &RecordBatch,
-    ) -> Result<Box<dyn PartitionEvaluator>> {
-        let values = self
-            .expressions()
-            .iter()
-            .map(|e| e.evaluate(batch))
-            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
-            .collect::<Result<Vec<_>>>()?;
-        Ok(Box::new(NthValueEvaluator {
-            kind: self.kind,
-            values,
-        }))
+    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+        Ok(Box::new(NthValueEvaluator { kind: self.kind }))
     }
 }
 
 /// Value evaluator for nth_value functions
 pub(crate) struct NthValueEvaluator {
     kind: NthValueKind,
-    values: Vec<ArrayRef>,
 }
 
 impl PartitionEvaluator for NthValueEvaluator {
@@ -144,12 +130,13 @@ impl PartitionEvaluator for NthValueEvaluator {
         true
     }
 
-    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> 
{
-        unreachable!("first, last, and nth_value evaluation must be called 
with evaluate_partition_with_rank")
-    }
-
-    fn evaluate_inside_range(&self, range: Range<usize>) -> 
Result<ScalarValue> {
-        let arr = &self.values[0];
+    fn evaluate_inside_range(
+        &self,
+        values: &[ArrayRef],
+        range: Range<usize>,
+    ) -> Result<ScalarValue> {
+        // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take single 
column, values will have size 1
+        let arr = &values[0];
         let n_range = range.end - range.start;
         match self.kind {
             NthValueKind::First => ScalarValue::try_from_array(arr, 
range.start),
@@ -188,10 +175,11 @@ mod tests {
                 end: i + 1,
             })
         }
-        let evaluator = expr.create_evaluator(&batch)?;
+        let evaluator = expr.create_evaluator()?;
+        let values = expr.evaluate_args(&batch)?;
         let result = ranges
             .into_iter()
-            .map(|range| evaluator.evaluate_inside_range(range))
+            .map(|range| evaluator.evaluate_inside_range(&values, range))
             .into_iter()
             .collect::<Result<Vec<ScalarValue>>>()?;
         let result = ScalarValue::iter_to_array(result.into_iter())?;
diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs 
b/datafusion/physical-expr/src/window/partition_evaluator.rs
index 4ecfd87a9..1608758d6 100644
--- a/datafusion/physical-expr/src/window/partition_evaluator.rs
+++ b/datafusion/physical-expr/src/window/partition_evaluator.rs
@@ -51,10 +51,14 @@ pub trait PartitionEvaluator {
     }
 
     /// evaluate the partition evaluator against the partitions
-    fn evaluate(&self, partition_points: Vec<Range<usize>>) -> 
Result<Vec<ArrayRef>> {
+    fn evaluate(
+        &self,
+        values: &[ArrayRef],
+        partition_points: Vec<Range<usize>>,
+    ) -> Result<Vec<ArrayRef>> {
         partition_points
             .into_iter()
-            .map(|partition| self.evaluate_partition(partition))
+            .map(|partition| self.evaluate_partition(values, partition))
             .collect()
     }
 
@@ -75,7 +79,15 @@ pub trait PartitionEvaluator {
     }
 
     /// evaluate the partition evaluator against the partition
-    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;
+    fn evaluate_partition(
+        &self,
+        _values: &[ArrayRef],
+        _partition: Range<usize>,
+    ) -> Result<ArrayRef> {
+        Err(DataFusionError::NotImplemented(
+            "evaluate_partition is not implemented by default".into(),
+        ))
+    }
 
     /// evaluate the partition evaluator against the partition but with rank
     fn evaluate_partition_with_rank(
@@ -89,7 +101,11 @@ pub trait PartitionEvaluator {
     }
 
     /// evaluate window function result inside given range
-    fn evaluate_inside_range(&self, _range: Range<usize>) -> 
Result<ScalarValue> {
+    fn evaluate_inside_range(
+        &self,
+        _values: &[ArrayRef],
+        _range: Range<usize>,
+    ) -> Result<ScalarValue> {
         Err(DataFusionError::NotImplemented(
             "evaluate_inside_range is not implemented by default".into(),
         ))
diff --git a/datafusion/physical-expr/src/window/rank.rs 
b/datafusion/physical-expr/src/window/rank.rs
index ec9aca532..3447d47b3 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -24,7 +24,6 @@ use crate::PhysicalExpr;
 use arrow::array::ArrayRef;
 use arrow::array::{Float64Array, UInt64Array};
 use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
 use datafusion_common::Result;
 use std::any::Any;
 use std::iter;
@@ -92,10 +91,7 @@ impl BuiltInWindowFunctionExpr for Rank {
         &self.name
     }
 
-    fn create_evaluator(
-        &self,
-        _batch: &RecordBatch,
-    ) -> Result<Box<dyn PartitionEvaluator>> {
+    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::new(RankEvaluator {
             rank_type: self.rank_type,
         }))
@@ -111,10 +107,6 @@ impl PartitionEvaluator for RankEvaluator {
         true
     }
 
-    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> 
{
-        unreachable!("rank evaluation must be called with 
evaluate_partition_with_rank")
-    }
-
     fn evaluate_partition_with_rank(
         &self,
         partition: Range<usize>,
@@ -166,35 +158,24 @@ impl PartitionEvaluator for RankEvaluator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow::{array::*, datatypes::*};
     use datafusion_common::cast::{as_float64_array, as_uint64_array};
 
     fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
-        test_i32_result(
-            expr,
-            vec![-2, -2, 1, 3, 3, 3, 7, 8],
-            vec![0..2, 2..3, 3..6, 6..7, 7..8],
-            expected,
-        )
+        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
     }
 
     fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
-        test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8], 
expected)
+        test_i32_result(expr, vec![0..8], expected)
     }
 
     fn test_f64_result(
         expr: &Rank,
-        data: Vec<i32>,
         range: Range<usize>,
         ranks: Vec<Range<usize>>,
         expected: Vec<f64>,
     ) -> Result<()> {
-        let arr: ArrayRef = Arc::new(Int32Array::from(data));
-        let values = vec![arr];
-        let schema = Schema::new(vec![Field::new("arr", DataType::Int32, 
false)]);
-        let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
         let result = expr
-            .create_evaluator(&batch)?
+            .create_evaluator()?
             .evaluate_with_rank(vec![range], ranks)?;
         assert_eq!(1, result.len());
         let result = as_float64_array(&result[0])?;
@@ -205,16 +186,11 @@ mod tests {
 
     fn test_i32_result(
         expr: &Rank,
-        data: Vec<i32>,
         ranks: Vec<Range<usize>>,
         expected: Vec<u64>,
     ) -> Result<()> {
-        let arr: ArrayRef = Arc::new(Int32Array::from(data));
-        let values = vec![arr];
-        let schema = Schema::new(vec![Field::new("arr", DataType::Int32, 
false)]);
-        let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
         let result = expr
-            .create_evaluator(&batch)?
+            .create_evaluator()?
             .evaluate_with_rank(vec![0..8], ranks)?;
         assert_eq!(1, result.len());
         let result = as_uint64_array(&result[0])?;
@@ -245,25 +221,19 @@ mod tests {
 
         // empty case
         let expected = vec![0.0; 0];
-        test_f64_result(&r, vec![0; 0], 0..0, vec![0..0; 0], expected)?;
+        test_f64_result(&r, 0..0, vec![0..0; 0], expected)?;
 
         // singleton case
         let expected = vec![0.0];
-        test_f64_result(&r, vec![13], 0..1, vec![0..1], expected)?;
+        test_f64_result(&r, 0..1, vec![0..1], expected)?;
 
         // uniform case
         let expected = vec![0.0; 7];
-        test_f64_result(&r, vec![4; 7], 0..7, vec![0..7], expected)?;
+        test_f64_result(&r, 0..7, vec![0..7], expected)?;
 
         // non-trivial case
         let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
-        test_f64_result(
-            &r,
-            vec![1, 1, 1, 2, 2, 2, 2],
-            0..7,
-            vec![0..3, 3..7],
-            expected,
-        )?;
+        test_f64_result(&r, 0..7, vec![0..3, 3..7], expected)?;
 
         Ok(())
     }
diff --git a/datafusion/physical-expr/src/window/row_number.rs 
b/datafusion/physical-expr/src/window/row_number.rs
index 11f4f620d..f70d9ea37 100644
--- a/datafusion/physical-expr/src/window/row_number.rs
+++ b/datafusion/physical-expr/src/window/row_number.rs
@@ -22,7 +22,6 @@ use crate::window::BuiltInWindowFunctionExpr;
 use crate::PhysicalExpr;
 use arrow::array::{ArrayRef, UInt64Array};
 use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
 use datafusion_common::Result;
 use std::any::Any;
 use std::ops::Range;
@@ -61,10 +60,7 @@ impl BuiltInWindowFunctionExpr for RowNumber {
         &self.name
     }
 
-    fn create_evaluator(
-        &self,
-        _batch: &RecordBatch,
-    ) -> Result<Box<dyn PartitionEvaluator>> {
+    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::<NumRowsEvaluator>::default())
     }
 }
@@ -73,7 +69,11 @@ impl BuiltInWindowFunctionExpr for RowNumber {
 pub(crate) struct NumRowsEvaluator {}
 
 impl PartitionEvaluator for NumRowsEvaluator {
-    fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
+    fn evaluate_partition(
+        &self,
+        _values: &[ArrayRef],
+        partition: Range<usize>,
+    ) -> Result<ArrayRef> {
         let num_rows = partition.end - partition.start;
         Ok(Arc::new(UInt64Array::from_iter_values(
             1..(num_rows as u64) + 1,
@@ -96,7 +96,10 @@ mod tests {
         let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, 
true)]);
         let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
         let row_number = RowNumber::new("row_number".to_owned());
-        let result = 
row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?;
+        let values = row_number.evaluate_args(&batch)?;
+        let result = row_number
+            .create_evaluator()?
+            .evaluate(&values, vec![0..8])?;
         assert_eq!(1, result.len());
         let result = as_uint64_array(&result[0])?;
         let result = result.values();
@@ -112,7 +115,10 @@ mod tests {
         let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, 
false)]);
         let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
         let row_number = RowNumber::new("row_number".to_owned());
-        let result = 
row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?;
+        let values = row_number.evaluate_args(&batch)?;
+        let result = row_number
+            .create_evaluator()?
+            .evaluate(&values, vec![0..8])?;
         assert_eq!(1, result.len());
         let result = as_uint64_array(&result[0])?;
         let result = result.values();
diff --git a/datafusion/physical-expr/src/window/window_expr.rs 
b/datafusion/physical-expr/src/window/window_expr.rs
index 67caba51d..fe381935b 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -99,15 +99,32 @@ pub trait WindowExpr: Send + Sync + Debug {
             .collect()
     }
 
+    /// get order by columns, empty if absent
+    fn order_by_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> 
{
+        self.order_by()
+            .iter()
+            .map(|e| e.evaluate_to_sort_column(batch))
+            .collect::<Result<Vec<SortColumn>>>()
+    }
+
     /// get sort columns that can be used for peer evaluation, empty if absent
     fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
         let mut sort_columns = self.partition_columns(batch)?;
-        let order_by_columns = self
-            .order_by()
-            .iter()
-            .map(|e| e.evaluate_to_sort_column(batch))
-            .collect::<Result<Vec<SortColumn>>>()?;
+        let order_by_columns = self.order_by_columns(batch)?;
         sort_columns.extend(order_by_columns);
         Ok(sort_columns)
     }
+
+    /// Get values columns(argument of Window Function)
+    /// and order by columns (columns of the ORDER BY expression)used in 
evaluators
+    fn get_values_orderbys(
+        &self,
+        record_batch: &RecordBatch,
+    ) -> Result<(Vec<ArrayRef>, Vec<ArrayRef>)> {
+        let values = self.evaluate_args(record_batch)?;
+        let order_by_columns = self.order_by_columns(record_batch)?;
+        let order_bys: Vec<ArrayRef> =
+            order_by_columns.iter().map(|s| s.values.clone()).collect();
+        Ok((values, order_bys))
+    }
 }

Reply via email to