This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b4c5291fbcb8 [SPARK-53014][PYTHON][DOCS] Make Arrow UDF public
b4c5291fbcb8 is described below

commit b4c5291fbcb8ecb08d87eb9bfeb997b0af12f0da
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Thu Jul 31 07:43:41 2025 +0900

    [SPARK-53014][PYTHON][DOCS] Make Arrow UDF public
    
    ### What changes were proposed in this pull request?
    Make Arrow UDF public
    
    ### Why are the changes needed?
    to provide a way for users to manipulate arrow data directly
    
    ### Does this PR introduce _any_ user-facing change?
    yes, see the examples added in the PR
    
    ### How was this patch tested?
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51718 from zhengruifeng/make_arrow_udf_public.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/connect/functions/builtin.py    |   2 +-
 python/pyspark/sql/functions/__init__.py           |   2 +
 python/pyspark/sql/functions/builtin.py            |   7 +-
 python/pyspark/sql/pandas/functions.py             | 270 +++++++++++++++++++++
 python/pyspark/sql/tests/arrow/test_arrow_udf.py   |   3 +-
 .../sql/tests/arrow/test_arrow_udf_grouped_agg.py  |   2 +-
 .../sql/tests/arrow/test_arrow_udf_scalar.py       |   3 +-
 .../sql/tests/arrow/test_arrow_udf_window.py       |   2 +-
 8 files changed, 283 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/sql/connect/functions/builtin.py 
b/python/pyspark/sql/connect/functions/builtin.py
index 49339f229506..db07dc862012 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -73,7 +73,7 @@ from pyspark.sql.utils import enum_to_value as _enum_to_value
 
 # The implementation of pandas_udf is embedded in 
pyspark.sql.function.pandas_udf
 # for code reuse.
-from pyspark.sql.functions import pandas_udf  # noqa: F401
+from pyspark.sql.functions import arrow_udf, pandas_udf  # noqa: F401
 
 
 if TYPE_CHECKING:
diff --git a/python/pyspark/sql/functions/__init__.py 
b/python/pyspark/sql/functions/__init__.py
index 930391abdebe..0144ac012e77 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -503,6 +503,7 @@ __all__ = [  # noqa: F405
     # UDF, UDTF and UDT
     "AnalyzeArgument",
     "AnalyzeResult",
+    "ArrowUDFType",
     "OrderingColumn",
     "PandasUDFType",
     "PartitioningColumn",
@@ -510,6 +511,7 @@ __all__ = [  # noqa: F405
     "SkipRestOfInputTableException",
     "UserDefinedFunction",
     "UserDefinedTableFunction",
+    "arrow_udf",
     "call_udf",
     "pandas_udf",
     "udf",
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index d9b88616c72d..2c74905af771 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -60,7 +60,12 @@ from pyspark.sql.udtf import SkipRestOfInputTableException  
# noqa: F401
 from pyspark.sql.udtf import UserDefinedTableFunction, _create_py_udtf
 
 # Keep pandas_udf and PandasUDFType import for backwards compatible import; 
moved in SPARK-28264
-from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType  # noqa: 
F401
+from pyspark.sql.pandas.functions import (  # noqa: F401
+    arrow_udf,  # noqa: F401
+    pandas_udf,  # noqa: F401
+    ArrowUDFType,  # noqa: F401
+    PandasUDFType,  # noqa: F401
+)  # noqa: F401
 
 from pyspark.sql.utils import (
     to_str as _to_str,
diff --git a/python/pyspark/sql/pandas/functions.py 
b/python/pyspark/sql/pandas/functions.py
index b1861efd2ca8..4a2e6db3b99f 100644
--- a/python/pyspark/sql/pandas/functions.py
+++ b/python/pyspark/sql/pandas/functions.py
@@ -52,6 +52,276 @@ class ArrowUDFType:
 
 
 def arrow_udf(f=None, returnType=None, functionType=None):
+    """
+    Creates an arrow user defined function.
+
+    Arrow UDFs are user defined functions that are executed by Spark using 
Arrow to transfer
+    and work with the data, which allows `pyarrow.Array` operations. An Arrow 
UDF is defined
+    using the `arrow_udf` as a decorator or to wrap the function, and no 
additional configuration
+    is required. An Arrow UDF behaves as a regular PySpark function API in 
general.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    f : function, optional
+        user-defined function. A python function if used as a standalone 
function
+    returnType : :class:`pyspark.sql.types.DataType` or str, optional
+        the return type of the user-defined function. The value can be either a
+        :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+    functionType : int, optional
+        an enum value in :class:`pyspark.sql.functions.ArrowUDFType`.
+        Default: SCALAR. This parameter exists for compatibility.
+        Using Python type hints is encouraged.
+
+    Examples
+    --------
+    In order to use this API, customarily the below are imported:
+
+    >>> import pyarrow as pa
+    >>> from pyspark.sql.functions import arrow_udf
+
+    `Python type hints <https://www.python.org/dev/peps/pep-0484>`_
+    detect the function types as below:
+
+    >>> from pyspark.sql.functions import ArrowUDFType
+    >>> from pyspark.sql.types import IntegerType
+    >>> @arrow_udf(IntegerType())
+    ... def slen(v: pa.Array) -> pa.Array:
+    ...     return pa.compute.utf8_length(v)
+
+    Note that the type hint should use `pyarrow.Array` in all cases.
+
+    * Arrays to Arrays
+        `pyarrow.Array`, ... -> `pyarrow.Array`
+
+        The function takes one or more `pyarrow.Array` and outputs one 
`pyarrow.Array`.
+        The output of the function should always be of the same length as the 
input.
+
+        >>> @arrow_udf("string")
+        ... def to_upper(s: pa.Array) -> pa.Array:
+        ...     return pa.compute.ascii_upper(s)
+        ...
+        >>> df = spark.createDataFrame([("John Doe",)], ("name",))
+        >>> df.select(to_upper("name")).show()
+        +--------------+
+        |to_upper(name)|
+        +--------------+
+        |      JOHN DOE|
+        +--------------+
+
+        >>> @arrow_udf("first string, last string")
+        ... def split_expand(v: pa.Array) -> pa.Array:
+        ...     b = pa.compute.ascii_split_whitespace(v)
+        ...     s0 = pa.array([t[0] for t in b])
+        ...     s1 = pa.array([t[1] for t in b])
+        ...     return pa.StructArray.from_arrays([s0, s1], names=["first", 
"last"])
+        ...
+        >>> df = spark.createDataFrame([("John Doe",)], ("name",))
+        >>> df.select(split_expand("name")).show()
+        +------------------+
+        |split_expand(name)|
+        +------------------+
+        |       {John, Doe}|
+        +------------------+
+
+        This type of Pandas UDF can use keyword arguments:
+
+        >>> from pyspark.sql.functions import col
+        >>> @arrow_udf(returnType=IntegerType())
+        ... def calc(a: pa.Array, b: pa.Array) -> pa.Array:
+        ...     return pa.compute.add(a, pa.compute.multiply(b, 10))
+        ...
+        >>> spark.range(2).select(calc(b=col("id") * 10, a=col("id"))).show()
+        +-----------------------------+
+        |calc(b => (id * 10), a => id)|
+        +-----------------------------+
+        |                            0|
+        |                          101|
+        +-----------------------------+
+
+        .. note:: The length of the input is not that of the whole input 
column, but is the
+            length of an internal batch used for each call to the function.
+
+    * Iterator of Arrays to Iterator of Arrays
+        `Iterator[pyarrow.Array]` -> `Iterator[pyarrow.Array]`
+
+        The function takes an iterator of `pyarrow.Array` and outputs an 
iterator of
+        `pyarrow.Array`. In this case, the created arrow UDF instance requires 
one input
+        column when this is called as a PySpark column. The length of the 
entire output from
+        the function should be the same length of the entire input; therefore, 
it can
+        prefetch the data from the input iterator as long as the lengths are 
the same.
+
+        It is also useful when the UDF execution
+        requires initializing some states, although internally it works 
identically as
+        Arrays to Arrays case. The pseudocode below illustrates the example.
+
+        .. highlight:: python
+        .. code-block:: python
+
+            @arrow_udf("long")
+            def calculate(iterator: Iterator[pa.Array]) -> Iterator[pa.Array]:
+                # Do some expensive initialization with a state
+                state = very_expensive_initialization()
+                for x in iterator:
+                    # Use that state for whole iterator.
+                    yield calculate_with_state(x, state)
+
+            df.select(calculate("value")).show()
+
+        >>> import pandas as pd
+        >>> from typing import Iterator
+        >>> @arrow_udf("long")
+        ... def plus_one(iterator: Iterator[pa.Array]) -> Iterator[pa.Array]:
+        ...     for v in iterator:
+        ...         yield pa.compute.add(v, 1)
+        ...
+        >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
+        >>> df.select(plus_one(df.v)).show()
+        +-----------+
+        |plus_one(v)|
+        +-----------+
+        |          2|
+        |          3|
+        |          4|
+        +-----------+
+
+        .. note:: The length of each series is the length of a batch 
internally used.
+
+    * Iterator of Multiple Arrays to Iterator of Arrays
+        `Iterator[Tuple[pyarrow.Array, ...]]` -> `Iterator[pyarrow.Array]`
+
+        The function takes an iterator of a tuple of multiple `pyarrow.Array` 
and outputs an
+        iterator of `pyarrow.Array`. In this case, the created arrow UDF 
instance requires
+        input columns as many as the series when this is called as a PySpark 
column.
+        Otherwise, it has the same characteristics and restrictions as 
Iterator of Arrays
+        to Iterator of Arrays case.
+
+        >>> from typing import Iterator, Tuple
+        >>> from pyspark.sql.functions import struct, col
+        >>> @arrow_udf("long")
+        ... def multiply(iterator: Iterator[Tuple[pa.Array, pa.Array]]) -> 
Iterator[pa.Array]:
+        ...     for v1, v2 in iterator:
+        ...         yield pa.compute.multiply(v1, v2.field("v"))
+        ...
+        >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
+        >>> df.withColumn('output', multiply(col("v"), 
struct(col("v")))).show()
+        +---+------+
+        |  v|output|
+        +---+------+
+        |  1|     1|
+        |  2|     4|
+        |  3|     9|
+        +---+------+
+
+        .. note:: The length of each series is the length of a batch 
internally used.
+
+    * Arrays to Scalar
+        `pyarrow.Array`, ... -> `Any`
+
+        The function takes `pyarrow.Array` and returns a scalar value. The 
returned scalar
+        can be a python primitive type, (e.g., int or float), a numpy data 
type (e.g.,
+        numpy.int64 or numpy.float64), or a pyarrow.Scalar instance which 
supports complex
+        return types.
+        `Any` should ideally be a specific scalar type accordingly.
+
+        >>> @arrow_udf("double")
+        ... def mean_udf(v: pa.Array) -> float:
+        ...     return pa.compute.mean(v).as_py()
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v"))
+        >>> df.groupby("id").agg(mean_udf(df['v'])).show()
+        +---+-----------+
+        | id|mean_udf(v)|
+        +---+-----------+
+        |  1|        1.5|
+        |  2|        6.0|
+        +---+-----------+
+
+        The retun type can also be a complex type such as struct, list, or map.
+        >>> @arrow_udf("struct<m1: double, m2: double>")
+        ... def min_max_udf(v: pa.Array) -> pa.Scalar:
+        ...     m1 = pa.compute.min(v)
+        ...     m2 = pa.compute.max(v)
+        ...     t = pa.struct([pa.field("m1", pa.float64()), pa.field("m2", 
pa.float64())])
+        ...     return pa.scalar(value={"m1": m1.as_py(), "m2": m2.as_py()}, 
type=t)
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v"))
+        >>> df.groupby("id").agg(min_max_udf(df['v'])).show()
+        +---+--------------+
+        | id|min_max_udf(v)|
+        +---+--------------+
+        |  1|    {1.0, 2.0}|
+        |  2|   {3.0, 10.0}|
+        +---+--------------+
+
+        This type of Pandas UDF can use keyword arguments:
+
+        >>> @arrow_udf("double")
+        ... def weighted_mean_udf(v: pa.Array, w: pa.Array) -> float:
+        ...     import numpy as np
+        ...     return np.average(v.to_numpy(), weights=w)
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0, 1.0), (1, 2.0, 2.0), (2, 3.0, 1.0), (2, 5.0, 2.0), 
(2, 10.0, 3.0)],
+        ...     ("id", "v", "w"))
+        >>> df.groupby("id").agg(weighted_mean_udf(w=df["w"], 
v=df["v"])).show()
+        +---+---------------------------------+
+        | id|weighted_mean_udf(w => w, v => v)|
+        +---+---------------------------------+
+        |  1|               1.6666666666666667|
+        |  2|                7.166666666666667|
+        +---+---------------------------------+
+
+        This UDF can also be used as window functions as below:
+
+        >>> from pyspark.sql import Window
+        >>> @arrow_udf("double")
+        ... def mean_udf(v: pa.Array) -> float:
+        ...     return pa.compute.mean(v).as_py()
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v"))
+        >>> w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
+        >>> df.withColumn('mean_v', mean_udf("v").over(w)).show()
+        +---+----+------+
+        | id|   v|mean_v|
+        +---+----+------+
+        |  1| 1.0|   1.0|
+        |  1| 2.0|   1.5|
+        |  2| 3.0|   3.0|
+        |  2| 5.0|   4.0|
+        |  2|10.0|   7.5|
+        +---+----+------+
+
+        .. note:: For performance reasons, the input series to window 
functions are not copied.
+            Therefore, mutating the input arrays is not allowed and will cause 
incorrect results.
+            For the same reason, users should also not rely on the index of 
the input arrays.
+
+    Notes
+    -----
+    The user-defined functions do not support conditional expressions or short 
circuiting
+    in boolean expressions and it ends up with being executed all internally. 
If the functions
+    can fail on special rows, the workaround is to incorporate the condition 
into the functions.
+
+    The user-defined functions do not take keyword arguments on the calling 
side.
+
+    The data type of returned `pyarrow.Array` from the user-defined functions 
should be
+    matched with defined `returnType` (see :meth:`types.to_arrow_type` and
+    :meth:`types.from_arrow_type`). When there is mismatch between them, Spark 
might do
+    conversion on returned data. The conversion is not guaranteed to be 
correct and results
+    should be checked for accuracy by users.
+
+    See Also
+    --------
+    pyspark.sql.GroupedData.agg
+    pyspark.sql.DataFrame.mapInArrow
+    pyspark.sql.GroupedData.applyInArrow
+    pyspark.sql.PandasCogroupedOps.applyInArrow
+    pyspark.sql.UDFRegistration.register
+    """
     return vectorized_udf(f, returnType, functionType, "arrow")
 
 
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udf.py
index 052ee1c3be83..e0976e6c8949 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf.py
@@ -18,8 +18,7 @@
 import unittest
 import datetime
 
-# TODO: import arrow_udf from public API
-from pyspark.sql.pandas.functions import arrow_udf, ArrowUDFType, PandasUDFType
+from pyspark.sql.functions import arrow_udf, ArrowUDFType, PandasUDFType
 from pyspark.sql import functions as F
 from pyspark.sql.types import (
     DoubleType,
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py
index 19adc77103a9..5a0ed6606f1a 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py
@@ -17,7 +17,7 @@
 
 import unittest
 
-from pyspark.sql.pandas.functions import arrow_udf, ArrowUDFType
+from pyspark.sql.functions import arrow_udf, ArrowUDFType
 from pyspark.util import PythonEvalType
 from pyspark.sql import Row
 from pyspark.sql import functions as sf
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
index a3210ec8d1f4..97db29ae6590 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
@@ -25,8 +25,7 @@ from typing import Iterator, Tuple
 
 from pyspark.util import PythonEvalType
 
-# TODO: import arrow_udf from public API
-from pyspark.sql.pandas.functions import arrow_udf, ArrowUDFType
+from pyspark.sql.functions import arrow_udf, ArrowUDFType
 from pyspark.sql import functions as F
 from pyspark.sql.types import (
     IntegerType,
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py
index 9541e798b0b4..42281b9caf49 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py
@@ -17,7 +17,7 @@
 
 import unittest
 
-from pyspark.sql.pandas.functions import arrow_udf, ArrowUDFType
+from pyspark.sql.functions import arrow_udf, ArrowUDFType
 from pyspark.util import PythonEvalType
 from pyspark.sql import functions as sf
 from pyspark.sql.window import Window


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to