Michael-J-Ward commented on code in PR #880: URL: https://github.com/apache/datafusion-python/pull/880#discussion_r1773701467
########## docs/source/user-guide/common-operations/udf-and-udfa.rst: ########## @@ -18,8 +18,21 @@ User Defined Functions ====================== -DataFusion provides powerful expressions and functions, reducing the need for custom Python functions. -However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs), with the :py:func:`~datafusion.udf.ScalarUDF.udf` function. +DataFusion provides powerful expressions and functions, reducing the need for custom Python +functions. However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs). + +Scalar Functions +---------------- + +When writing a user defined function that can operate on a row by row basis, these are called Scalar +Functions. You can define your own scalar function by calling +:py:func:`~datafusion.udf.ScalarUDF.udf` . + +The basic definition of a scalar UDF is a python function that takes one or more +`pyarrow <https://arrow.apache.org/docs/python/index.html>`_ arrays and returns a single array as +output. DataFusion scalar UDFs operate on an entire batch of record at a time, though the evaluation Review Comment: ```suggestion output. DataFusion scalar UDFs operate on an entire batch of records at a time, though the evaluation ``` ########## python/datafusion/udf.py: ########## @@ -246,3 +246,229 @@ def udaf( state_type=state_type, volatility=volatility, ) + + +class WindowEvaluator(metaclass=ABCMeta): + """Evaluator class for user defined window functions (UDWF). + + It is up to the user to decide which evaluate function is appropriate. + + |``uses_window_frame``|``supports_bounded_execution``|``include_rank``|function_to_implement| + |---|---|----|----| + |False (default) |False (default) |False (default) | ``evaluate_all`` | + |False |True |False | ``evaluate`` | + |False |True/False |True | ``evaluate_all_with_rank`` | + |True |True/False |True/False | ``evaluate`` | + """ # noqa: W505 + + def memoize(self) -> None: + """Perform a memoize operation to improve performance. + + When the window frame has a fixed beginning (e.g UNBOUNDED Review Comment: Q for my own understanding: Should this be "When the window frame has a fixed **bound** (beginning or end)..."? I don't see how `last_value` could benefit from `memoize` because of a fixed bound? (If I'm correct, I'll go fix the docs upstream) https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.PartitionEvaluator.html#method.memoize ########## src/udwf.rs: ########## @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{make_array, Array, ArrayData, ArrayRef}; +use datafusion::logical_expr::window_state::WindowAggState; +use datafusion::scalar::ScalarValue; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; + +use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::{ + PartitionEvaluator, PartitionEvaluatorFactory, Signature, Volatility, WindowUDF, WindowUDFImpl, +}; +use pyo3::types::{PyList, PyTuple}; + +use crate::expr::PyExpr; +use crate::utils::parse_volatility; + +#[derive(Debug)] +struct RustPartitionEvaluator { + evaluator: PyObject, +} + +impl RustPartitionEvaluator { + fn new(evaluator: PyObject) -> Self { + Self { evaluator } + } +} + +impl PartitionEvaluator for RustPartitionEvaluator { + fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> { + Python::with_gil(|py| self.evaluator.bind(py).call_method0("memoize").map(|_| ())) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + } + + fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> { + Python::with_gil(|py| { + let py_args = vec![idx.to_object(py), n_rows.to_object(py)]; + let py_args = PyTuple::new_bound(py, py_args); + + self.evaluator + .bind(py) + .call_method1("get_range", py_args) + .and_then(|v| { + let tuple: Bound<'_, PyTuple> = v.extract()?; + if tuple.len() != 2 { + return Err(PyValueError::new_err(format!( + "Expected get_range to return tuple of length 2. Received length {}", + tuple.len() + ))); + } + + let start: usize = tuple.get_item(0).unwrap().extract()?; + let end: usize = tuple.get_item(1).unwrap().extract()?; + + Ok(Range { start, end }) + }) + }) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + } + + fn is_causal(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("is_causal") + .and_then(|v| v.extract()) + .unwrap_or(false) + }) + } + + fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> { + Python::with_gil(|py| { + let py_values = PyList::new_bound( + py, + values + .iter() + .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), + ); + let py_num_rows = num_rows.to_object(py).into_bound(py); + let py_args = PyTuple::new_bound( + py, + PyTuple::new_bound(py, vec![py_values.as_any(), &py_num_rows]), + ); + + self.evaluator + .bind(py) + .call_method1("evaluate_all", py_args) + .map(|v| { + let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); + make_array(array_data) + }) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + }) + } + + fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> { + Python::with_gil(|py| { + let py_values = PyList::new_bound( + py, + values + .iter() + .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), + ); + let range_tuple = + PyTuple::new_bound(py, vec![range.start.to_object(py), range.end.to_object(py)]); + let py_args = PyTuple::new_bound( + py, + PyTuple::new_bound(py, vec![py_values.as_any(), range_tuple.as_any()]), + ); + + self.evaluator + .bind(py) + .call_method1("evaluate", py_args) + .and_then(|v| v.extract()) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + }) + } + + fn evaluate_all_with_rank( + &self, + num_rows: usize, + ranks_in_partition: &[Range<usize>], + ) -> Result<ArrayRef> { + Python::with_gil(|py| { + let ranks = ranks_in_partition + .iter() + .map(|r| PyTuple::new_bound(py, vec![r.start, r.end])); + + // 1. cast args to Pyarrow array + let py_args = vec![num_rows.to_object(py), PyList::new_bound(py, ranks).into()]; + + let py_args = PyTuple::new_bound(py, py_args); + + // 2. call function + self.evaluator + .bind(py) + .call_method1("evaluate_all_with_rank", py_args) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + .map(|v| { + let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); + make_array(array_data) + }) + }) + } + + fn supports_bounded_execution(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("supports_bounded_execution") + .and_then(|v| v.extract()) + .unwrap_or(false) + }) + } + + fn uses_window_frame(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("uses_window_frame") + .and_then(|v| v.extract()) + .unwrap_or(false) + }) + } + + fn include_rank(&self) -> bool { + Python::with_gil(|py| { + self.evaluator + .bind(py) + .call_method0("include_rank") + .and_then(|v| v.extract()) + .unwrap_or(false) + }) + } +} + +pub fn to_rust_partition_evaluator(evaluator: PyObject) -> PartitionEvaluatorFactory { + Arc::new(move || -> Result<Box<dyn PartitionEvaluator>> { + let evaluator = Python::with_gil(|py| evaluator.clone_ref(py)); + Ok(Box::new(RustPartitionEvaluator::new(evaluator))) + }) +} + +/// Represents an WindowUDF +#[pyclass(name = "WindowUDF", module = "datafusion", subclass)] +#[derive(Debug, Clone)] +pub struct PyWindowUDF { + pub(crate) function: WindowUDF, +} + +#[pymethods] +impl PyWindowUDF { + #[new] + #[pyo3(signature=(name, evaluator, input_types, return_type, volatility))] + fn new( + name: &str, + evaluator: PyObject, + input_types: Vec<PyArrowType<DataType>>, + return_type: PyArrowType<DataType>, + volatility: &str, + ) -> PyResult<Self> { + let return_type = return_type.0; + let input_types = input_types.into_iter().map(|t| t.0).collect(); + + let function = WindowUDF::from(MultiColumnWindowUDF::new( + name, + input_types, + return_type, + parse_volatility(volatility)?, + to_rust_partition_evaluator(evaluator), + )); + Ok(Self { function }) + } + + /// creates a new PyExpr with the call of the udf + #[pyo3(signature = (*args))] + fn __call__(&self, args: Vec<PyExpr>) -> PyResult<PyExpr> { + let args = args.iter().map(|e| e.expr.clone()).collect(); + Ok(self.function.call(args).into()) + } + + fn __repr__(&self) -> PyResult<String> { + Ok(format!("WindowUDF({})", self.function.name())) + } +} + +pub struct MultiColumnWindowUDF { Review Comment: 2 Qs: 1) Does this need to be public? 2) Why motivation behind the name? The `Multi` makes me expect to see a `SingleColumnWindowUDF` counterpart. ########## src/udwf.rs: ########## @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{make_array, Array, ArrayData, ArrayRef}; +use datafusion::logical_expr::window_state::WindowAggState; +use datafusion::scalar::ScalarValue; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; + +use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::{ + PartitionEvaluator, PartitionEvaluatorFactory, Signature, Volatility, WindowUDF, WindowUDFImpl, +}; +use pyo3::types::{PyList, PyTuple}; + +use crate::expr::PyExpr; +use crate::utils::parse_volatility; + +#[derive(Debug)] +struct RustPartitionEvaluator { Review Comment: Why `RustPartitionEvaluator` instead of `PyPartitionEvaluator`? ########## python/datafusion/udf.py: ########## @@ -246,3 +246,229 @@ def udaf( state_type=state_type, volatility=volatility, ) + + +class WindowEvaluator(metaclass=ABCMeta): + """Evaluator class for user defined window functions (UDWF). + + It is up to the user to decide which evaluate function is appropriate. + + |``uses_window_frame``|``supports_bounded_execution``|``include_rank``|function_to_implement| + |---|---|----|----| + |False (default) |False (default) |False (default) | ``evaluate_all`` | + |False |True |False | ``evaluate`` | + |False |True/False |True | ``evaluate_all_with_rank`` | + |True |True/False |True/False | ``evaluate`` | Review Comment: ```suggestion +------------------------+--------------------------------+------------------+---------------------------+ | ``uses_window_frame`` | ``supports_bounded_execution`` | ``include_rank`` | function_to_implement | +========================+================================+==================+===========================+ | False (default) | False (default) | False (default) | ``evaluate_all`` | +------------------------+--------------------------------+------------------+---------------------------+ | False | True | False | ``evaluate`` | +------------------------+--------------------------------+------------------+---------------------------+ | False | True/False | True | ``evaluate_all_with_rank``| +------------------------+--------------------------------+------------------+---------------------------+ | True | True/False | True/False | ``evaluate`` | +------------------------+--------------------------------+------------------+---------------------------+ ``` Does the table render properly for you? I had to update to this suggestion in order for the table to render properly. ########## python/datafusion/udf.py: ########## @@ -246,3 +246,229 @@ def udaf( state_type=state_type, volatility=volatility, ) + + +class WindowEvaluator(metaclass=ABCMeta): + """Evaluator class for user defined window functions (UDWF). + + It is up to the user to decide which evaluate function is appropriate. + + |``uses_window_frame``|``supports_bounded_execution``|``include_rank``|function_to_implement| + |---|---|----|----| + |False (default) |False (default) |False (default) | ``evaluate_all`` | + |False |True |False | ``evaluate`` | + |False |True/False |True | ``evaluate_all_with_rank`` | + |True |True/False |True/False | ``evaluate`` | + """ # noqa: W505 + + def memoize(self) -> None: + """Perform a memoize operation to improve performance. + + When the window frame has a fixed beginning (e.g UNBOUNDED + PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and + NTH_VALUE do not need the (unbounded) input once they have + seen a certain amount of input. + + `memoize` is called after each input batch is processed, and + such functions can save whatever they need + """ + pass + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + """Return the range for the window fuction. + + If `uses_window_frame` flag is `false`. This method is used to + calculate required range for the window function during + stateful execution. + + Generally there is no required range, hence by default this + returns smallest range(current row). e.g seeing current row is + enough to calculate window result (such as row_number, rank, + etc) + + Args: + idx:: Current index + num_rows: Number of rows. + """ + return (idx, idx + 1) + + def is_causal(self) -> bool: + """Get whether evaluator needs future data for its result.""" + return False + + def evaluate_all(self, values: list[pyarrow.Array], num_rows: int) -> pyarrow.Array: + """Evaluate a window function on an entire input partition. + + This function is called once per input *partition* for window + functions that *do not use* values from the window frame, + such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`, + `CUME_DIST`, `LEAD`, `LAG`). + + It produces the result of all rows in a single pass. It + expects to receive the entire partition as the `value` and + must produce an output column with one output row for every + input row. + + `num_rows` is required to correctly compute the output in case + `values.len() == 0` + + Implementing this function is an optimization: certain window + functions are not affected by the window frame definition or + the query doesn't have a frame, and `evaluate` skips the + (costly) window frame boundary calculation and the overhead of + calling `evaluate` for each output row. + + For example, the `LAG` built in window function does not use + the values of its window frame (it can be computed in one shot + on the entire partition with `Self::evaluate_all` regardless of the + window defined in the `OVER` clause) + + ```sql + lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + ``` + + However, `avg()` computes the average in the window and thus + does use its window frame + + ```sql + avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + ``` + """ + pass + + def evaluate( + self, values: list[pyarrow.Array], eval_range: tuple[int, int] + ) -> pyarrow.Scalar: + """Evaluate window function on a range of rows in an input partition. + + This is the simplest and most general function to implement + but also the least performant as it creates output one row at + a time. It is typically much faster to implement stateful + evaluation using one of the other specialized methods on this + trait. + + Returns a [`ScalarValue`] that is the value of the window + function within `range` for the entire partition. Argument + `values` contains the evaluation result of function arguments + and evaluation results of ORDER BY expressions. If function has a + single argument, `values[1..]` will contain ORDER BY expression results. + """ + pass + + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pyarrow.Array: + """Called for window functions that only need the rank of a row. + + Evaluate the partition evaluator against the partition using + the row ranks. For example, `RANK(col)` produces + + ```text + col | rank + --- + ---- + A | 1 + A | 1 + C | 3 + D | 4 + D | 5 + ``` Review Comment: ```suggestion .. code-block:: text col | rank --- + ---- A | 1 A | 1 C | 3 D | 4 D | 4 ``` Same Q for this table. ########## python/datafusion/udf.py: ########## @@ -246,3 +246,229 @@ def udaf( state_type=state_type, volatility=volatility, ) + + +class WindowEvaluator(metaclass=ABCMeta): + """Evaluator class for user defined window functions (UDWF). + + It is up to the user to decide which evaluate function is appropriate. + + |``uses_window_frame``|``supports_bounded_execution``|``include_rank``|function_to_implement| + |---|---|----|----| + |False (default) |False (default) |False (default) | ``evaluate_all`` | + |False |True |False | ``evaluate`` | + |False |True/False |True | ``evaluate_all_with_rank`` | + |True |True/False |True/False | ``evaluate`` | + """ # noqa: W505 + + def memoize(self) -> None: + """Perform a memoize operation to improve performance. + + When the window frame has a fixed beginning (e.g UNBOUNDED + PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and + NTH_VALUE do not need the (unbounded) input once they have + seen a certain amount of input. + + `memoize` is called after each input batch is processed, and + such functions can save whatever they need + """ + pass + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + """Return the range for the window fuction. + + If `uses_window_frame` flag is `false`. This method is used to + calculate required range for the window function during + stateful execution. + + Generally there is no required range, hence by default this + returns smallest range(current row). e.g seeing current row is + enough to calculate window result (such as row_number, rank, + etc) + + Args: + idx:: Current index + num_rows: Number of rows. + """ + return (idx, idx + 1) + + def is_causal(self) -> bool: + """Get whether evaluator needs future data for its result.""" + return False + + def evaluate_all(self, values: list[pyarrow.Array], num_rows: int) -> pyarrow.Array: + """Evaluate a window function on an entire input partition. + + This function is called once per input *partition* for window + functions that *do not use* values from the window frame, + such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`, + `CUME_DIST`, `LEAD`, `LAG`). + + It produces the result of all rows in a single pass. It + expects to receive the entire partition as the `value` and + must produce an output column with one output row for every + input row. + + `num_rows` is required to correctly compute the output in case + `values.len() == 0` + + Implementing this function is an optimization: certain window + functions are not affected by the window frame definition or + the query doesn't have a frame, and `evaluate` skips the + (costly) window frame boundary calculation and the overhead of + calling `evaluate` for each output row. + + For example, the `LAG` built in window function does not use + the values of its window frame (it can be computed in one shot + on the entire partition with `Self::evaluate_all` regardless of the + window defined in the `OVER` clause) + + ```sql + lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + ``` + + However, `avg()` computes the average in the window and thus + does use its window frame + + ```sql + avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + ``` + """ + pass + + def evaluate( + self, values: list[pyarrow.Array], eval_range: tuple[int, int] + ) -> pyarrow.Scalar: + """Evaluate window function on a range of rows in an input partition. + + This is the simplest and most general function to implement + but also the least performant as it creates output one row at + a time. It is typically much faster to implement stateful + evaluation using one of the other specialized methods on this + trait. + + Returns a [`ScalarValue`] that is the value of the window + function within `range` for the entire partition. Argument + `values` contains the evaluation result of function arguments + and evaluation results of ORDER BY expressions. If function has a + single argument, `values[1..]` will contain ORDER BY expression results. + """ + pass + + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pyarrow.Array: + """Called for window functions that only need the rank of a row. + + Evaluate the partition evaluator against the partition using + the row ranks. For example, `RANK(col)` produces + + ```text + col | rank + --- + ---- + A | 1 + A | 1 + C | 3 + D | 4 + D | 5 + ``` + + For this case, `num_rows` would be `5` and the + `ranks_in_partition` would be called with + + ```text + [ + (0,1), + (2,2), + (3,4), + ] Review Comment: ```suggestion .. code-block:: python [ (0, 1), (2, 2), (3, 4), ] ``` ########## python/datafusion/udf.py: ########## @@ -246,3 +246,229 @@ def udaf( state_type=state_type, volatility=volatility, ) + + +class WindowEvaluator(metaclass=ABCMeta): + """Evaluator class for user defined window functions (UDWF). + + It is up to the user to decide which evaluate function is appropriate. + + |``uses_window_frame``|``supports_bounded_execution``|``include_rank``|function_to_implement| + |---|---|----|----| + |False (default) |False (default) |False (default) | ``evaluate_all`` | + |False |True |False | ``evaluate`` | + |False |True/False |True | ``evaluate_all_with_rank`` | + |True |True/False |True/False | ``evaluate`` | + """ # noqa: W505 + + def memoize(self) -> None: + """Perform a memoize operation to improve performance. + + When the window frame has a fixed beginning (e.g UNBOUNDED + PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and + NTH_VALUE do not need the (unbounded) input once they have + seen a certain amount of input. + + `memoize` is called after each input batch is processed, and + such functions can save whatever they need + """ + pass + + def get_range(self, idx: int, num_rows: int) -> tuple[int, int]: + """Return the range for the window fuction. + + If `uses_window_frame` flag is `false`. This method is used to + calculate required range for the window function during + stateful execution. + + Generally there is no required range, hence by default this + returns smallest range(current row). e.g seeing current row is + enough to calculate window result (such as row_number, rank, + etc) + + Args: + idx:: Current index + num_rows: Number of rows. + """ + return (idx, idx + 1) + + def is_causal(self) -> bool: + """Get whether evaluator needs future data for its result.""" + return False + + def evaluate_all(self, values: list[pyarrow.Array], num_rows: int) -> pyarrow.Array: + """Evaluate a window function on an entire input partition. + + This function is called once per input *partition* for window + functions that *do not use* values from the window frame, + such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`, + `CUME_DIST`, `LEAD`, `LAG`). + + It produces the result of all rows in a single pass. It + expects to receive the entire partition as the `value` and + must produce an output column with one output row for every + input row. + + `num_rows` is required to correctly compute the output in case + `values.len() == 0` + + Implementing this function is an optimization: certain window + functions are not affected by the window frame definition or + the query doesn't have a frame, and `evaluate` skips the + (costly) window frame boundary calculation and the overhead of + calling `evaluate` for each output row. + + For example, the `LAG` built in window function does not use + the values of its window frame (it can be computed in one shot + on the entire partition with `Self::evaluate_all` regardless of the + window defined in the `OVER` clause) + + ```sql + lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + ``` + + However, `avg()` computes the average in the window and thus + does use its window frame + + ```sql + avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) + ``` + """ + pass + + def evaluate( + self, values: list[pyarrow.Array], eval_range: tuple[int, int] + ) -> pyarrow.Scalar: + """Evaluate window function on a range of rows in an input partition. + + This is the simplest and most general function to implement + but also the least performant as it creates output one row at + a time. It is typically much faster to implement stateful + evaluation using one of the other specialized methods on this + trait. + + Returns a [`ScalarValue`] that is the value of the window + function within `range` for the entire partition. Argument + `values` contains the evaluation result of function arguments + and evaluation results of ORDER BY expressions. If function has a + single argument, `values[1..]` will contain ORDER BY expression results. + """ + pass + + def evaluate_all_with_rank( + self, num_rows: int, ranks_in_partition: list[tuple[int, int]] + ) -> pyarrow.Array: + """Called for window functions that only need the rank of a row. + + Evaluate the partition evaluator against the partition using + the row ranks. For example, `RANK(col)` produces + + ```text + col | rank + --- + ---- + A | 1 + A | 1 + C | 3 + D | 4 + D | 5 Review Comment: The `rank` for the 2nd `D` should be `4`, right? (That's what I get in the REPL) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org