This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 743a81f  #819: python bindings for window functions (#819)
743a81f is described below

commit 743a81f4e6cd065333d3e17f7231aba45495db67
Author: Javier Goday <[email protected]>
AuthorDate: Tue Nov 23 01:24:39 2021 +0100

    #819: python bindings for window functions (#819)
    
    Thanks for the contribution!
---
 python/datafusion/tests/test_dataframe.py | 18 +++++++++
 python/src/functions.rs                   | 61 +++++++++++++++++++++++++++++++
 2 files changed, 79 insertions(+)

diff --git a/python/datafusion/tests/test_dataframe.py 
b/python/datafusion/tests/test_dataframe.py
index 0eb970a..9040b6f 100644
--- a/python/datafusion/tests/test_dataframe.py
+++ b/python/datafusion/tests/test_dataframe.py
@@ -18,6 +18,7 @@
 import pyarrow as pa
 import pytest
 
+from datafusion import functions as f
 from datafusion import DataFrame, ExecutionContext, column, literal, udf
 
 
@@ -117,6 +118,23 @@ def test_join():
     assert table.to_pydict() == expected
 
 
+def test_window_lead(df):
+    df = df.select(
+        column("a"),
+        f.alias(
+            f.window(
+                "lead", [column("b")], order_by=[f.order_by(column("b"))]
+            ),
+            "a_next",
+        ),
+    )
+
+    table = pa.Table.from_batches(df.collect())
+
+    expected = {"a": [1, 2, 3], "a_next": [5, 6, None]}
+    assert table.to_pydict() == expected
+
+
 def test_get_dataframe(tmp_path):
     ctx = ExecutionContext()
 
diff --git a/python/src/functions.rs b/python/src/functions.rs
index a286220..c0b4e59 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -23,6 +23,7 @@ use datafusion::physical_plan::{
     aggregates::AggregateFunction, functions::BuiltinScalarFunction,
 };
 
+use crate::errors;
 use crate::expression::PyExpr;
 
 #[pyfunction]
@@ -85,6 +86,63 @@ fn concat_ws(sep: String, args: Vec<PyExpr>) -> 
PyResult<PyExpr> {
     Ok(logical_plan::concat_ws(sep, &args).into())
 }
 
+/// Creates a new Sort expression
+#[pyfunction]
+fn order_by(
+    expr: PyExpr,
+    asc: Option<bool>,
+    nulls_first: Option<bool>,
+) -> PyResult<PyExpr> {
+    Ok(PyExpr {
+        expr: datafusion::logical_plan::Expr::Sort {
+            expr: Box::new(expr.expr),
+            asc: asc.unwrap_or(true),
+            nulls_first: nulls_first.unwrap_or(true),
+        },
+    })
+}
+
+/// Creates a new Alias expression
+#[pyfunction]
+fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
+    Ok(PyExpr {
+        expr: datafusion::logical_plan::Expr::Alias(
+            Box::new(expr.expr),
+            String::from(name),
+        ),
+    })
+}
+
+/// Creates a new Window function expression
+#[pyfunction]
+fn window(
+    name: &str,
+    args: Vec<PyExpr>,
+    partition_by: Option<Vec<PyExpr>>,
+    order_by: Option<Vec<PyExpr>>,
+) -> PyResult<PyExpr> {
+    use std::str::FromStr;
+    let fun = 
datafusion::physical_plan::window_functions::WindowFunction::from_str(name)
+        .map_err(|e| -> errors::DataFusionError { e.into() })?;
+    Ok(PyExpr {
+        expr: datafusion::logical_plan::Expr::WindowFunction {
+            fun,
+            args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
+            partition_by: partition_by
+                .unwrap_or(vec![])
+                .into_iter()
+                .map(|x| x.expr)
+                .collect::<Vec<_>>(),
+            order_by: order_by
+                .unwrap_or(vec![])
+                .into_iter()
+                .map(|x| x.expr)
+                .collect::<Vec<_>>(),
+            window_frame: None,
+        },
+    })
+}
+
 macro_rules! scalar_function {
     ($NAME: ident, $FUNC: ident) => {
         scalar_function!($NAME, $FUNC, stringify!($NAME));
@@ -218,6 +276,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
     m.add_wrapped(wrap_pyfunction!(abs))?;
     m.add_wrapped(wrap_pyfunction!(acos))?;
     m.add_wrapped(wrap_pyfunction!(approx_distinct))?;
+    m.add_wrapped(wrap_pyfunction!(alias))?;
     m.add_wrapped(wrap_pyfunction!(array))?;
     m.add_wrapped(wrap_pyfunction!(ascii))?;
     m.add_wrapped(wrap_pyfunction!(asin))?;
@@ -249,6 +308,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
     m.add_wrapped(wrap_pyfunction!(min))?;
     m.add_wrapped(wrap_pyfunction!(now))?;
     m.add_wrapped(wrap_pyfunction!(octet_length))?;
+    m.add_wrapped(wrap_pyfunction!(order_by))?;
     m.add_wrapped(wrap_pyfunction!(random))?;
     m.add_wrapped(wrap_pyfunction!(regexp_match))?;
     m.add_wrapped(wrap_pyfunction!(regexp_replace))?;
@@ -278,5 +338,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
     m.add_wrapped(wrap_pyfunction!(trim))?;
     m.add_wrapped(wrap_pyfunction!(trunc))?;
     m.add_wrapped(wrap_pyfunction!(upper))?;
+    m.add_wrapped(wrap_pyfunction!(window))?;
     Ok(())
 }

Reply via email to