This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 743a81f #819: python bindings for window functions (#819)
743a81f is described below
commit 743a81f4e6cd065333d3e17f7231aba45495db67
Author: Javier Goday <[email protected]>
AuthorDate: Tue Nov 23 01:24:39 2021 +0100
#819: python bindings for window functions (#819)
Thanks for the contribution!
---
python/datafusion/tests/test_dataframe.py | 18 +++++++++
python/src/functions.rs | 61 +++++++++++++++++++++++++++++++
2 files changed, 79 insertions(+)
diff --git a/python/datafusion/tests/test_dataframe.py
b/python/datafusion/tests/test_dataframe.py
index 0eb970a..9040b6f 100644
--- a/python/datafusion/tests/test_dataframe.py
+++ b/python/datafusion/tests/test_dataframe.py
@@ -18,6 +18,7 @@
import pyarrow as pa
import pytest
+from datafusion import functions as f
from datafusion import DataFrame, ExecutionContext, column, literal, udf
@@ -117,6 +118,23 @@ def test_join():
assert table.to_pydict() == expected
+def test_window_lead(df):
+ df = df.select(
+ column("a"),
+ f.alias(
+ f.window(
+ "lead", [column("b")], order_by=[f.order_by(column("b"))]
+ ),
+ "a_next",
+ ),
+ )
+
+ table = pa.Table.from_batches(df.collect())
+
+ expected = {"a": [1, 2, 3], "a_next": [5, 6, None]}
+ assert table.to_pydict() == expected
+
+
def test_get_dataframe(tmp_path):
ctx = ExecutionContext()
diff --git a/python/src/functions.rs b/python/src/functions.rs
index a286220..c0b4e59 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -23,6 +23,7 @@ use datafusion::physical_plan::{
aggregates::AggregateFunction, functions::BuiltinScalarFunction,
};
+use crate::errors;
use crate::expression::PyExpr;
#[pyfunction]
@@ -85,6 +86,63 @@ fn concat_ws(sep: String, args: Vec<PyExpr>) ->
PyResult<PyExpr> {
Ok(logical_plan::concat_ws(sep, &args).into())
}
+/// Creates a new Sort expression
+#[pyfunction]
+fn order_by(
+ expr: PyExpr,
+ asc: Option<bool>,
+ nulls_first: Option<bool>,
+) -> PyResult<PyExpr> {
+ Ok(PyExpr {
+ expr: datafusion::logical_plan::Expr::Sort {
+ expr: Box::new(expr.expr),
+ asc: asc.unwrap_or(true),
+ nulls_first: nulls_first.unwrap_or(true),
+ },
+ })
+}
+
+/// Creates a new Alias expression
+#[pyfunction]
+fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
+ Ok(PyExpr {
+ expr: datafusion::logical_plan::Expr::Alias(
+ Box::new(expr.expr),
+ String::from(name),
+ ),
+ })
+}
+
+/// Creates a new Window function expression
+#[pyfunction]
+fn window(
+ name: &str,
+ args: Vec<PyExpr>,
+ partition_by: Option<Vec<PyExpr>>,
+ order_by: Option<Vec<PyExpr>>,
+) -> PyResult<PyExpr> {
+ use std::str::FromStr;
+ let fun =
datafusion::physical_plan::window_functions::WindowFunction::from_str(name)
+ .map_err(|e| -> errors::DataFusionError { e.into() })?;
+ Ok(PyExpr {
+ expr: datafusion::logical_plan::Expr::WindowFunction {
+ fun,
+ args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
+ partition_by: partition_by
+ .unwrap_or(vec![])
+ .into_iter()
+ .map(|x| x.expr)
+ .collect::<Vec<_>>(),
+ order_by: order_by
+ .unwrap_or(vec![])
+ .into_iter()
+ .map(|x| x.expr)
+ .collect::<Vec<_>>(),
+ window_frame: None,
+ },
+ })
+}
+
macro_rules! scalar_function {
($NAME: ident, $FUNC: ident) => {
scalar_function!($NAME, $FUNC, stringify!($NAME));
@@ -218,6 +276,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(abs))?;
m.add_wrapped(wrap_pyfunction!(acos))?;
m.add_wrapped(wrap_pyfunction!(approx_distinct))?;
+ m.add_wrapped(wrap_pyfunction!(alias))?;
m.add_wrapped(wrap_pyfunction!(array))?;
m.add_wrapped(wrap_pyfunction!(ascii))?;
m.add_wrapped(wrap_pyfunction!(asin))?;
@@ -249,6 +308,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(min))?;
m.add_wrapped(wrap_pyfunction!(now))?;
m.add_wrapped(wrap_pyfunction!(octet_length))?;
+ m.add_wrapped(wrap_pyfunction!(order_by))?;
m.add_wrapped(wrap_pyfunction!(random))?;
m.add_wrapped(wrap_pyfunction!(regexp_match))?;
m.add_wrapped(wrap_pyfunction!(regexp_replace))?;
@@ -278,5 +338,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(trim))?;
m.add_wrapped(wrap_pyfunction!(trunc))?;
m.add_wrapped(wrap_pyfunction!(upper))?;
+ m.add_wrapped(wrap_pyfunction!(window))?;
Ok(())
}