This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 09aea09b6 Refactor Builtin Window Function Implementation (#4441)
09aea09b6 is described below
commit 09aea09b6e7bd2950c0f57296ccb1a1995570057
Author: Mustafa akur <[email protected]>
AuthorDate: Thu Dec 1 18:43:57 2022 +0300
Refactor Builtin Window Function Implementation (#4441)
---
datafusion/physical-expr/src/window/aggregate.rs | 24 +++--------
datafusion/physical-expr/src/window/built_in.rs | 27 +++++-------
.../src/window/built_in_window_function_expr.rs | 16 ++++++--
datafusion/physical-expr/src/window/cume_dist.rs | 36 ++++------------
datafusion/physical-expr/src/window/lead_lag.rs | 26 +++++-------
datafusion/physical-expr/src/window/nth_value.rs | 36 ++++++----------
.../src/window/partition_evaluator.rs | 24 +++++++++--
datafusion/physical-expr/src/window/rank.rs | 48 ++++------------------
datafusion/physical-expr/src/window/row_number.rs | 22 ++++++----
datafusion/physical-expr/src/window/window_expr.rs | 27 +++++++++---
10 files changed, 122 insertions(+), 164 deletions(-)
diff --git a/datafusion/physical-expr/src/window/aggregate.rs
b/datafusion/physical-expr/src/window/aggregate.rs
index 69b0812c1..17a19b6d7 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -87,15 +87,9 @@ impl WindowExpr for AggregateWindowExpr {
let partition_columns = self.partition_columns(batch)?;
let partition_points =
self.evaluate_partition_points(batch.num_rows(),
&partition_columns)?;
- let values = self.evaluate_args(batch)?;
-
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
- let columns = self.sort_columns(batch)?;
- let order_columns: Vec<&ArrayRef> = columns.iter().map(|s|
&s.values).collect();
- // Sort values, this will make the same partitions consecutive. Also,
within the partition
- // range, values will be sorted.
- let order_bys = &order_columns[self.partition_by.len()..];
+ let (_, order_bys) = self.get_values_orderbys(batch)?;
let window_frame = if !order_bys.is_empty() &&
self.window_frame.is_none() {
// OVER (ORDER BY a) case
// We create an implicit window for ORDER BY.
@@ -107,14 +101,8 @@ impl WindowExpr for AggregateWindowExpr {
for partition_range in &partition_points {
let mut accumulator = self.aggregate.create_accumulator()?;
let length = partition_range.end - partition_range.start;
- let slice_order_bys = order_bys
- .iter()
- .map(|v| v.slice(partition_range.start, length))
- .collect::<Vec<_>>();
- let value_slice = values
- .iter()
- .map(|v| v.slice(partition_range.start, length))
- .collect::<Vec<_>>();
+ let (values, order_bys) =
+ self.get_values_orderbys(&batch.slice(partition_range.start,
length))?;
let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
let mut last_range: (usize, usize) = (0, 0);
@@ -123,7 +111,7 @@ impl WindowExpr for AggregateWindowExpr {
// First, cur_range is calculated, then it is compared with
last_range.
for i in 0..length {
let cur_range = window_frame_ctx.calculate_range(
- &slice_order_bys,
+ &order_bys,
&sort_options,
length,
i,
@@ -135,7 +123,7 @@ impl WindowExpr for AggregateWindowExpr {
// Accumulate any new rows that have entered the window:
let update_bound = cur_range.1 - last_range.1;
if update_bound > 0 {
- let update: Vec<ArrayRef> = value_slice
+ let update: Vec<ArrayRef> = values
.iter()
.map(|v| v.slice(last_range.1, update_bound))
.collect();
@@ -144,7 +132,7 @@ impl WindowExpr for AggregateWindowExpr {
// Remove rows that have now left the window:
let retract_bound = cur_range.0 - last_range.0;
if retract_bound > 0 {
- let retract: Vec<ArrayRef> = value_slice
+ let retract: Vec<ArrayRef> = values
.iter()
.map(|v| v.slice(last_range.0, retract_bound))
.collect();
diff --git a/datafusion/physical-expr/src/window/built_in.rs
b/datafusion/physical-expr/src/window/built_in.rs
index da551d534..adf8d5b34 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -21,7 +21,6 @@ use super::window_frame_state::WindowFrameContext;
use super::BuiltInWindowFunctionExpr;
use super::WindowExpr;
use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
-use arrow::array::Array;
use arrow::compute::{concat, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
@@ -85,7 +84,7 @@ impl WindowExpr for BuiltInWindowExpr {
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
- let evaluator = self.expr.create_evaluator(batch)?;
+ let evaluator = self.expr.create_evaluator()?;
let num_rows = batch.num_rows();
let partition_columns = self.partition_columns(batch)?;
let partition_points =
@@ -94,12 +93,7 @@ impl WindowExpr for BuiltInWindowExpr {
let results = if evaluator.uses_window_frame() {
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
- let columns = self.sort_columns(batch)?;
- let order_columns: Vec<&ArrayRef> =
- columns.iter().map(|s| &s.values).collect();
- // Sort values, this will make the same partitions consecutive.
Also, within the partition
- // range, values will be sorted.
- let order_bys = &order_columns[self.partition_by.len()..];
+ let (_, order_bys) = self.get_values_orderbys(batch)?;
let window_frame = if !order_bys.is_empty() &&
self.window_frame.is_none() {
// OVER (ORDER BY a) case
// We create an implicit window for ORDER BY.
@@ -110,24 +104,22 @@ impl WindowExpr for BuiltInWindowExpr {
let mut row_wise_results = vec![];
for partition_range in &partition_points {
let length = partition_range.end - partition_range.start;
- let slice_order_bys = order_bys
- .iter()
- .map(|v| v.slice(partition_range.start, length))
- .collect::<Vec<_>>();
+ let (values, order_bys) = self
+ .get_values_orderbys(&batch.slice(partition_range.start,
length))?;
let mut window_frame_ctx =
WindowFrameContext::new(&window_frame);
// We iterate on each row to calculate window frame range and
and window function result
for idx in 0..length {
let range = window_frame_ctx.calculate_range(
- &slice_order_bys,
+ &order_bys,
&sort_options,
num_rows,
idx,
)?;
let range = Range {
- start: partition_range.start + range.0,
- end: partition_range.start + range.1,
+ start: range.0,
+ end: range.1,
};
- let value = evaluator.evaluate_inside_range(range)?;
+ let value = evaluator.evaluate_inside_range(&values,
range)?;
row_wise_results.push(value.to_array());
}
}
@@ -138,7 +130,8 @@ impl WindowExpr for BuiltInWindowExpr {
self.evaluate_partition_points(num_rows, &columns)?;
evaluator.evaluate_with_rank(partition_points,
sort_partition_points)?
} else {
- evaluator.evaluate(partition_points)?
+ let (values, _) = self.get_values_orderbys(batch)?;
+ evaluator.evaluate(&values, partition_points)?
};
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat(&results).map_err(DataFusionError::ArrowError)
diff --git
a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
index 43e1272bc..7f7a27435 100644
--- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
+++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
@@ -17,6 +17,7 @@
use super::partition_evaluator::PartitionEvaluator;
use crate::PhysicalExpr;
+use arrow::array::ArrayRef;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
@@ -45,9 +46,16 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync +
std::fmt::Debug {
"BuiltInWindowFunctionExpr: default name"
}
+ /// Evaluate window function arguments against the batch and return
+ /// an array ref. Typically, the resulting vector is a single element
vector.
+ fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
+ self.expressions()
+ .iter()
+ .map(|e| e.evaluate(batch))
+ .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .collect()
+ }
+
/// Create built-in window evaluator with a batch
- fn create_evaluator(
- &self,
- batch: &RecordBatch,
- ) -> Result<Box<dyn PartitionEvaluator>>;
+ fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
}
diff --git a/datafusion/physical-expr/src/window/cume_dist.rs
b/datafusion/physical-expr/src/window/cume_dist.rs
index 922788816..4202058a3 100644
--- a/datafusion/physical-expr/src/window/cume_dist.rs
+++ b/datafusion/physical-expr/src/window/cume_dist.rs
@@ -24,7 +24,6 @@ use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use std::any::Any;
use std::iter;
@@ -62,10 +61,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
&self.name
}
- fn create_evaluator(
- &self,
- _batch: &RecordBatch,
- ) -> Result<Box<dyn PartitionEvaluator>> {
+ fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CumeDistEvaluator {}))
}
}
@@ -77,12 +73,6 @@ impl PartitionEvaluator for CumeDistEvaluator {
true
}
- fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>
{
- unreachable!(
- "cume_dist evaluation must be called with
evaluate_partition_with_rank"
- )
- }
-
fn evaluate_partition_with_rank(
&self,
partition: Range<usize>,
@@ -108,22 +98,16 @@ impl PartitionEvaluator for CumeDistEvaluator {
#[cfg(test)]
mod tests {
use super::*;
- use arrow::{array::*, datatypes::*};
use datafusion_common::cast::as_float64_array;
fn test_i32_result(
expr: &CumeDist,
- data: Vec<i32>,
partition: Range<usize>,
ranks: Vec<Range<usize>>,
expected: Vec<f64>,
) -> Result<()> {
- let arr: ArrayRef = Arc::new(Int32Array::from(data));
- let values = vec![arr];
- let schema = Schema::new(vec![Field::new("arr", DataType::Int32,
false)]);
- let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
- .create_evaluator(&batch)?
+ .create_evaluator()?
.evaluate_with_rank(vec![partition], ranks)?;
assert_eq!(1, result.len());
let result = as_float64_array(&result[0])?;
@@ -137,25 +121,19 @@ mod tests {
let r = cume_dist("arr".into());
let expected = vec![0.0; 0];
- test_i32_result(&r, vec![], 0..0, vec![], expected)?;
+ test_i32_result(&r, 0..0, vec![], expected)?;
let expected = vec![1.0; 1];
- test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?;
+ test_i32_result(&r, 0..1, vec![0..1], expected)?;
let expected = vec![1.0; 2];
- test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?;
+ test_i32_result(&r, 0..2, vec![0..2], expected)?;
let expected = vec![0.5, 0.5, 1.0, 1.0];
- test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4],
expected)?;
+ test_i32_result(&r, 0..4, vec![0..2, 2..4], expected)?;
let expected = vec![0.25, 0.5, 0.75, 1.0];
- test_i32_result(
- &r,
- vec![1, 2, 4, 5],
- 0..4,
- vec![0..1, 1..2, 2..3, 3..4],
- expected,
- )?;
+ test_i32_result(&r, 0..4, vec![0..1, 1..2, 2..3, 3..4], expected)?;
Ok(())
}
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs
b/datafusion/physical-expr/src/window/lead_lag.rs
index 860df716a..ef4175d13 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -24,7 +24,6 @@ use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
@@ -95,19 +94,9 @@ impl BuiltInWindowFunctionExpr for WindowShift {
&self.name
}
- fn create_evaluator(
- &self,
- batch: &RecordBatch,
- ) -> Result<Box<dyn PartitionEvaluator>> {
- let values = self
- .expressions()
- .iter()
- .map(|e| e.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
- .collect::<Result<Vec<_>>>()?;
+ fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(WindowShiftEvaluator {
shift_offset: self.shift_offset,
- values,
default_value: self.default_value.clone(),
}))
}
@@ -115,7 +104,6 @@ impl BuiltInWindowFunctionExpr for WindowShift {
pub(crate) struct WindowShiftEvaluator {
shift_offset: i64,
- values: Vec<ArrayRef>,
default_value: Option<ScalarValue>,
}
@@ -169,8 +157,13 @@ fn shift_with_default_value(
}
impl PartitionEvaluator for WindowShiftEvaluator {
- fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
- let value = &self.values[0];
+ fn evaluate_partition(
+ &self,
+ values: &[ArrayRef],
+ partition: Range<usize>,
+ ) -> Result<ArrayRef> {
+ // LEAD, LAG window functions take single column, values will have
size 1
+ let value = &values[0];
let value = value.slice(partition.start, partition.end -
partition.start);
shift_with_default_value(&value, self.shift_offset,
self.default_value.as_ref())
}
@@ -190,7 +183,8 @@ mod tests {
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32,
false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
- let result = expr.create_evaluator(&batch)?.evaluate(vec![0..8])?;
+ let values = expr.evaluate_args(&batch)?;
+ let result = expr.create_evaluator()?.evaluate(&values, vec![0..8])?;
assert_eq!(1, result.len());
let result = as_int32_array(&result[0])?;
assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/nth_value.rs
b/datafusion/physical-expr/src/window/nth_value.rs
index e9988032c..e0afb520e 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -23,7 +23,6 @@ use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
@@ -116,27 +115,14 @@ impl BuiltInWindowFunctionExpr for NthValue {
&self.name
}
- fn create_evaluator(
- &self,
- batch: &RecordBatch,
- ) -> Result<Box<dyn PartitionEvaluator>> {
- let values = self
- .expressions()
- .iter()
- .map(|e| e.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
- .collect::<Result<Vec<_>>>()?;
- Ok(Box::new(NthValueEvaluator {
- kind: self.kind,
- values,
- }))
+ fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+ Ok(Box::new(NthValueEvaluator { kind: self.kind }))
}
}
/// Value evaluator for nth_value functions
pub(crate) struct NthValueEvaluator {
kind: NthValueKind,
- values: Vec<ArrayRef>,
}
impl PartitionEvaluator for NthValueEvaluator {
@@ -144,12 +130,13 @@ impl PartitionEvaluator for NthValueEvaluator {
true
}
- fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>
{
- unreachable!("first, last, and nth_value evaluation must be called
with evaluate_partition_with_rank")
- }
-
- fn evaluate_inside_range(&self, range: Range<usize>) ->
Result<ScalarValue> {
- let arr = &self.values[0];
+ fn evaluate_inside_range(
+ &self,
+ values: &[ArrayRef],
+ range: Range<usize>,
+ ) -> Result<ScalarValue> {
+ // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take single
column, values will have size 1
+ let arr = &values[0];
let n_range = range.end - range.start;
match self.kind {
NthValueKind::First => ScalarValue::try_from_array(arr,
range.start),
@@ -188,10 +175,11 @@ mod tests {
end: i + 1,
})
}
- let evaluator = expr.create_evaluator(&batch)?;
+ let evaluator = expr.create_evaluator()?;
+ let values = expr.evaluate_args(&batch)?;
let result = ranges
.into_iter()
- .map(|range| evaluator.evaluate_inside_range(range))
+ .map(|range| evaluator.evaluate_inside_range(&values, range))
.into_iter()
.collect::<Result<Vec<ScalarValue>>>()?;
let result = ScalarValue::iter_to_array(result.into_iter())?;
diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs
b/datafusion/physical-expr/src/window/partition_evaluator.rs
index 4ecfd87a9..1608758d6 100644
--- a/datafusion/physical-expr/src/window/partition_evaluator.rs
+++ b/datafusion/physical-expr/src/window/partition_evaluator.rs
@@ -51,10 +51,14 @@ pub trait PartitionEvaluator {
}
/// evaluate the partition evaluator against the partitions
- fn evaluate(&self, partition_points: Vec<Range<usize>>) ->
Result<Vec<ArrayRef>> {
+ fn evaluate(
+ &self,
+ values: &[ArrayRef],
+ partition_points: Vec<Range<usize>>,
+ ) -> Result<Vec<ArrayRef>> {
partition_points
.into_iter()
- .map(|partition| self.evaluate_partition(partition))
+ .map(|partition| self.evaluate_partition(values, partition))
.collect()
}
@@ -75,7 +79,15 @@ pub trait PartitionEvaluator {
}
/// evaluate the partition evaluator against the partition
- fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;
+ fn evaluate_partition(
+ &self,
+ _values: &[ArrayRef],
+ _partition: Range<usize>,
+ ) -> Result<ArrayRef> {
+ Err(DataFusionError::NotImplemented(
+ "evaluate_partition is not implemented by default".into(),
+ ))
+ }
/// evaluate the partition evaluator against the partition but with rank
fn evaluate_partition_with_rank(
@@ -89,7 +101,11 @@ pub trait PartitionEvaluator {
}
/// evaluate window function result inside given range
- fn evaluate_inside_range(&self, _range: Range<usize>) ->
Result<ScalarValue> {
+ fn evaluate_inside_range(
+ &self,
+ _values: &[ArrayRef],
+ _range: Range<usize>,
+ ) -> Result<ScalarValue> {
Err(DataFusionError::NotImplemented(
"evaluate_inside_range is not implemented by default".into(),
))
diff --git a/datafusion/physical-expr/src/window/rank.rs
b/datafusion/physical-expr/src/window/rank.rs
index ec9aca532..3447d47b3 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -24,7 +24,6 @@ use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::{Float64Array, UInt64Array};
use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use std::any::Any;
use std::iter;
@@ -92,10 +91,7 @@ impl BuiltInWindowFunctionExpr for Rank {
&self.name
}
- fn create_evaluator(
- &self,
- _batch: &RecordBatch,
- ) -> Result<Box<dyn PartitionEvaluator>> {
+ fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(RankEvaluator {
rank_type: self.rank_type,
}))
@@ -111,10 +107,6 @@ impl PartitionEvaluator for RankEvaluator {
true
}
- fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>
{
- unreachable!("rank evaluation must be called with
evaluate_partition_with_rank")
- }
-
fn evaluate_partition_with_rank(
&self,
partition: Range<usize>,
@@ -166,35 +158,24 @@ impl PartitionEvaluator for RankEvaluator {
#[cfg(test)]
mod tests {
use super::*;
- use arrow::{array::*, datatypes::*};
use datafusion_common::cast::{as_float64_array, as_uint64_array};
fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
- test_i32_result(
- expr,
- vec![-2, -2, 1, 3, 3, 3, 7, 8],
- vec![0..2, 2..3, 3..6, 6..7, 7..8],
- expected,
- )
+ test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
}
fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
- test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8],
expected)
+ test_i32_result(expr, vec![0..8], expected)
}
fn test_f64_result(
expr: &Rank,
- data: Vec<i32>,
range: Range<usize>,
ranks: Vec<Range<usize>>,
expected: Vec<f64>,
) -> Result<()> {
- let arr: ArrayRef = Arc::new(Int32Array::from(data));
- let values = vec![arr];
- let schema = Schema::new(vec![Field::new("arr", DataType::Int32,
false)]);
- let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
- .create_evaluator(&batch)?
+ .create_evaluator()?
.evaluate_with_rank(vec![range], ranks)?;
assert_eq!(1, result.len());
let result = as_float64_array(&result[0])?;
@@ -205,16 +186,11 @@ mod tests {
fn test_i32_result(
expr: &Rank,
- data: Vec<i32>,
ranks: Vec<Range<usize>>,
expected: Vec<u64>,
) -> Result<()> {
- let arr: ArrayRef = Arc::new(Int32Array::from(data));
- let values = vec![arr];
- let schema = Schema::new(vec![Field::new("arr", DataType::Int32,
false)]);
- let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
- .create_evaluator(&batch)?
+ .create_evaluator()?
.evaluate_with_rank(vec![0..8], ranks)?;
assert_eq!(1, result.len());
let result = as_uint64_array(&result[0])?;
@@ -245,25 +221,19 @@ mod tests {
// empty case
let expected = vec![0.0; 0];
- test_f64_result(&r, vec![0; 0], 0..0, vec![0..0; 0], expected)?;
+ test_f64_result(&r, 0..0, vec![0..0; 0], expected)?;
// singleton case
let expected = vec![0.0];
- test_f64_result(&r, vec![13], 0..1, vec![0..1], expected)?;
+ test_f64_result(&r, 0..1, vec![0..1], expected)?;
// uniform case
let expected = vec![0.0; 7];
- test_f64_result(&r, vec![4; 7], 0..7, vec![0..7], expected)?;
+ test_f64_result(&r, 0..7, vec![0..7], expected)?;
// non-trivial case
let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
- test_f64_result(
- &r,
- vec![1, 1, 1, 2, 2, 2, 2],
- 0..7,
- vec![0..3, 3..7],
- expected,
- )?;
+ test_f64_result(&r, 0..7, vec![0..3, 3..7], expected)?;
Ok(())
}
diff --git a/datafusion/physical-expr/src/window/row_number.rs
b/datafusion/physical-expr/src/window/row_number.rs
index 11f4f620d..f70d9ea37 100644
--- a/datafusion/physical-expr/src/window/row_number.rs
+++ b/datafusion/physical-expr/src/window/row_number.rs
@@ -22,7 +22,6 @@ use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use std::any::Any;
use std::ops::Range;
@@ -61,10 +60,7 @@ impl BuiltInWindowFunctionExpr for RowNumber {
&self.name
}
- fn create_evaluator(
- &self,
- _batch: &RecordBatch,
- ) -> Result<Box<dyn PartitionEvaluator>> {
+ fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::<NumRowsEvaluator>::default())
}
}
@@ -73,7 +69,11 @@ impl BuiltInWindowFunctionExpr for RowNumber {
pub(crate) struct NumRowsEvaluator {}
impl PartitionEvaluator for NumRowsEvaluator {
- fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
+ fn evaluate_partition(
+ &self,
+ _values: &[ArrayRef],
+ partition: Range<usize>,
+ ) -> Result<ArrayRef> {
let num_rows = partition.end - partition.start;
Ok(Arc::new(UInt64Array::from_iter_values(
1..(num_rows as u64) + 1,
@@ -96,7 +96,10 @@ mod tests {
let schema = Schema::new(vec![Field::new("arr", DataType::Boolean,
true)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
let row_number = RowNumber::new("row_number".to_owned());
- let result =
row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?;
+ let values = row_number.evaluate_args(&batch)?;
+ let result = row_number
+ .create_evaluator()?
+ .evaluate(&values, vec![0..8])?;
assert_eq!(1, result.len());
let result = as_uint64_array(&result[0])?;
let result = result.values();
@@ -112,7 +115,10 @@ mod tests {
let schema = Schema::new(vec![Field::new("arr", DataType::Boolean,
false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
let row_number = RowNumber::new("row_number".to_owned());
- let result =
row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?;
+ let values = row_number.evaluate_args(&batch)?;
+ let result = row_number
+ .create_evaluator()?
+ .evaluate(&values, vec![0..8])?;
assert_eq!(1, result.len());
let result = as_uint64_array(&result[0])?;
let result = result.values();
diff --git a/datafusion/physical-expr/src/window/window_expr.rs
b/datafusion/physical-expr/src/window/window_expr.rs
index 67caba51d..fe381935b 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -99,15 +99,32 @@ pub trait WindowExpr: Send + Sync + Debug {
.collect()
}
+ /// get order by columns, empty if absent
+ fn order_by_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>>
{
+ self.order_by()
+ .iter()
+ .map(|e| e.evaluate_to_sort_column(batch))
+ .collect::<Result<Vec<SortColumn>>>()
+ }
+
/// get sort columns that can be used for peer evaluation, empty if absent
fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
let mut sort_columns = self.partition_columns(batch)?;
- let order_by_columns = self
- .order_by()
- .iter()
- .map(|e| e.evaluate_to_sort_column(batch))
- .collect::<Result<Vec<SortColumn>>>()?;
+ let order_by_columns = self.order_by_columns(batch)?;
sort_columns.extend(order_by_columns);
Ok(sort_columns)
}
+
+ /// Get values columns(argument of Window Function)
+ /// and order by columns (columns of the ORDER BY expression)used in
evaluators
+ fn get_values_orderbys(
+ &self,
+ record_batch: &RecordBatch,
+ ) -> Result<(Vec<ArrayRef>, Vec<ArrayRef>)> {
+ let values = self.evaluate_args(record_batch)?;
+ let order_by_columns = self.order_by_columns(record_batch)?;
+ let order_bys: Vec<ArrayRef> =
+ order_by_columns.iter().map(|s| s.values.clone()).collect();
+ Ok((values, order_bys))
+ }
}