This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new a3c108f  docs: Example of calling Python UDF & UDAF in SQL (#258)
a3c108f is described below

commit a3c108f5a08e485627dc8550c4e0b1d92de90a75
Author: Dejan Simic <[email protected]>
AuthorDate: Mon Mar 6 15:43:40 2023 +0100

    docs: Example of calling Python UDF & UDAF in SQL (#258)
    
    * Document UDF calls in SQL
    
    * Remove unnecessary imports
    
    * FIx example
---
 README.md                         |  4 ++
 datafusion/__init__.py            |  4 +-
 examples/sql-using-python-udaf.py | 91 +++++++++++++++++++++++++++++++++++++++
 examples/sql-using-python-udf.py  | 65 ++++++++++++++++++++++++++++
 4 files changed, 162 insertions(+), 2 deletions(-)

diff --git a/README.md b/README.md
index 923b6be..7c29def 100644
--- a/README.md
+++ b/README.md
@@ -95,7 +95,11 @@ See [examples](examples/README.md) for more information.
 - [Query a Parquet file using SQL](./examples/sql-parquet.py)
 - [Query a Parquet file using the DataFrame 
API](./examples/dataframe-parquet.py)
 - [Run a SQL query and store the results in a Pandas 
DataFrame](./examples/sql-to-pandas.py)
+- [Run a SQL query with a Python user-defined function 
(UDF)](./examples/sql-using-python-udf.py)
+- [Run a SQL query with a Python user-defined aggregation function 
(UDAF)](./examples/sql-using-python-udaf.py)
 - [Query PyArrow Data](./examples/query-pyarrow-data.py)
+- [Create dataframe](./examples/import.py)
+- [Export dataframe](./examples/export.py)
 
 ### Running User-Defined Python Code
 
diff --git a/datafusion/__init__.py b/datafusion/__init__.py
index f5583c2..a7878e1 100644
--- a/datafusion/__init__.py
+++ b/datafusion/__init__.py
@@ -171,7 +171,7 @@ def udf(func, input_types, return_type, volatility, 
name=None):
     if not callable(func):
         raise TypeError("`func` argument must be callable")
     if name is None:
-        name = func.__qualname__
+        name = func.__qualname__.lower()
     return ScalarUDF(
         name=name,
         func=func,
@@ -190,7 +190,7 @@ def udaf(accum, input_type, return_type, state_type, 
volatility, name=None):
             "`accum` must implement the abstract base class Accumulator"
         )
     if name is None:
-        name = accum.__qualname__
+        name = accum.__qualname__.lower()
     return AggregateUDF(
         name=name,
         accumulator=accum,
diff --git a/examples/sql-using-python-udaf.py 
b/examples/sql-using-python-udaf.py
new file mode 100644
index 0000000..9aacc5d
--- /dev/null
+++ b/examples/sql-using-python-udaf.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import udaf, SessionContext, Accumulator
+import pyarrow as pa
+
+
+# Define a user-defined aggregation function (UDAF)
+class MyAccumulator(Accumulator):
+    """
+    Interface of a user-defined accumulation.
+    """
+
+    def __init__(self):
+        self._sum = pa.scalar(0.0)
+
+    def update(self, values: pa.Array) -> None:
+        # not nice since pyarrow scalars can't be summed yet. This breaks on 
`None`
+        self._sum = pa.scalar(
+            self._sum.as_py() + pa.compute.sum(values).as_py()
+        )
+
+    def merge(self, states: pa.Array) -> None:
+        # not nice since pyarrow scalars can't be summed yet. This breaks on 
`None`
+        self._sum = pa.scalar(
+            self._sum.as_py() + pa.compute.sum(states).as_py()
+        )
+
+    def state(self) -> pa.Array:
+        return pa.array([self._sum.as_py()])
+
+    def evaluate(self) -> pa.Scalar:
+        return self._sum
+
+
+my_udaf = udaf(
+    MyAccumulator,
+    pa.float64(),
+    pa.float64(),
+    [pa.float64()],
+    "stable",
+    # This will be the name of the UDAF in SQL
+    # If not specified it will by default the same as accumulator class name
+    name="my_accumulator",
+)
+
+# Create a context
+ctx = SessionContext()
+
+# Create a datafusion DataFrame from a Python dictionary
+source_df = ctx.from_pydict({"a": [1, 1, 3], "b": [4, 5, 6]})
+# Dataframe:
+# +---+---+
+# | a | b |
+# +---+---+
+# | 1 | 4 |
+# | 1 | 5 |
+# | 3 | 6 |
+# +---+---+
+
+# Register UDF for use in SQL
+ctx.register_udaf(my_udaf)
+
+# Query the DataFrame using SQL
+table_name = ctx.catalog().database().names().pop()
+result_df = ctx.sql(
+    f"select a, my_accumulator(b) as b_aggregated from {table_name} group by a 
order by a"
+)
+# Dataframe:
+# +---+--------------+
+# | a | b_aggregated |
+# +---+--------------+
+# | 1 | 9            |
+# | 3 | 6            |
+# +---+--------------+
+assert result_df.to_pydict()["a"] == [1, 3]
+assert result_df.to_pydict()["b_aggregated"] == [9, 6]
diff --git a/examples/sql-using-python-udf.py b/examples/sql-using-python-udf.py
new file mode 100644
index 0000000..717b88e
--- /dev/null
+++ b/examples/sql-using-python-udf.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import udf, SessionContext
+import pyarrow as pa
+
+
+# Define a user-defined function (UDF)
+def is_null(array: pa.Array) -> pa.Array:
+    return array.is_null()
+
+
+is_null_arr = udf(
+    is_null,
+    [pa.int64()],
+    pa.bool_(),
+    "stable",
+    # This will be the name of the UDF in SQL
+    # If not specified it will by default the same as Python function name
+    name="is_null",
+)
+
+# Create a context
+ctx = SessionContext()
+
+# Create a datafusion DataFrame from a Python dictionary
+source_df = ctx.from_pydict({"a": [1, 2, 3], "b": [4, None, 6]})
+# Dataframe:
+# +---+---+
+# | a | b |
+# +---+---+
+# | 1 | 4 |
+# | 2 |   |
+# | 3 | 6 |
+# +---+---+
+
+# Register UDF for use in SQL
+ctx.register_udf(is_null_arr)
+
+# Query the DataFrame using SQL
+table_name = ctx.catalog().database().names().pop()
+result_df = ctx.sql(f"select a, is_null(b) as b_is_null from {table_name}")
+# Dataframe:
+# +---+-----------+
+# | a | b_is_null |
+# +---+-----------+
+# | 1 | false     |
+# | 2 | true      |
+# | 3 | false     |
+# +---+-----------+
+assert result_df.to_pydict()["b_is_null"] == [False, True, False]

Reply via email to