This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b4c5291fbcb8 [SPARK-53014][PYTHON][DOCS] Make Arrow UDF public b4c5291fbcb8 is described below commit b4c5291fbcb8ecb08d87eb9bfeb997b0af12f0da Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Thu Jul 31 07:43:41 2025 +0900 [SPARK-53014][PYTHON][DOCS] Make Arrow UDF public ### What changes were proposed in this pull request? Make Arrow UDF public ### Why are the changes needed? to provide a way for users to manipulate arrow data directly ### Does this PR introduce _any_ user-facing change? yes, see the examples added in the PR ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? No Closes #51718 from zhengruifeng/make_arrow_udf_public. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/functions/builtin.py | 2 +- python/pyspark/sql/functions/__init__.py | 2 + python/pyspark/sql/functions/builtin.py | 7 +- python/pyspark/sql/pandas/functions.py | 270 +++++++++++++++++++++ python/pyspark/sql/tests/arrow/test_arrow_udf.py | 3 +- .../sql/tests/arrow/test_arrow_udf_grouped_agg.py | 2 +- .../sql/tests/arrow/test_arrow_udf_scalar.py | 3 +- .../sql/tests/arrow/test_arrow_udf_window.py | 2 +- 8 files changed, 283 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/connect/functions/builtin.py b/python/pyspark/sql/connect/functions/builtin.py index 49339f229506..db07dc862012 100644 --- a/python/pyspark/sql/connect/functions/builtin.py +++ b/python/pyspark/sql/connect/functions/builtin.py @@ -73,7 +73,7 @@ from pyspark.sql.utils import enum_to_value as _enum_to_value # The implementation of pandas_udf is embedded in pyspark.sql.function.pandas_udf # for code reuse. -from pyspark.sql.functions import pandas_udf # noqa: F401 +from pyspark.sql.functions import arrow_udf, pandas_udf # noqa: F401 if TYPE_CHECKING: diff --git a/python/pyspark/sql/functions/__init__.py b/python/pyspark/sql/functions/__init__.py index 930391abdebe..0144ac012e77 100644 --- a/python/pyspark/sql/functions/__init__.py +++ b/python/pyspark/sql/functions/__init__.py @@ -503,6 +503,7 @@ __all__ = [ # noqa: F405 # UDF, UDTF and UDT "AnalyzeArgument", "AnalyzeResult", + "ArrowUDFType", "OrderingColumn", "PandasUDFType", "PartitioningColumn", @@ -510,6 +511,7 @@ __all__ = [ # noqa: F405 "SkipRestOfInputTableException", "UserDefinedFunction", "UserDefinedTableFunction", + "arrow_udf", "call_udf", "pandas_udf", "udf", diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index d9b88616c72d..2c74905af771 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -60,7 +60,12 @@ from pyspark.sql.udtf import SkipRestOfInputTableException # noqa: F401 from pyspark.sql.udtf import UserDefinedTableFunction, _create_py_udtf # Keep pandas_udf and PandasUDFType import for backwards compatible import; moved in SPARK-28264 -from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType # noqa: F401 +from pyspark.sql.pandas.functions import ( # noqa: F401 + arrow_udf, # noqa: F401 + pandas_udf, # noqa: F401 + ArrowUDFType, # noqa: F401 + PandasUDFType, # noqa: F401 +) # noqa: F401 from pyspark.sql.utils import ( to_str as _to_str, diff --git a/python/pyspark/sql/pandas/functions.py b/python/pyspark/sql/pandas/functions.py index b1861efd2ca8..4a2e6db3b99f 100644 --- a/python/pyspark/sql/pandas/functions.py +++ b/python/pyspark/sql/pandas/functions.py @@ -52,6 +52,276 @@ class ArrowUDFType: def arrow_udf(f=None, returnType=None, functionType=None): + """ + Creates an arrow user defined function. + + Arrow UDFs are user defined functions that are executed by Spark using Arrow to transfer + and work with the data, which allows `pyarrow.Array` operations. An Arrow UDF is defined + using the `arrow_udf` as a decorator or to wrap the function, and no additional configuration + is required. An Arrow UDF behaves as a regular PySpark function API in general. + + .. versionadded:: 4.1.0 + + Parameters + ---------- + f : function, optional + user-defined function. A python function if used as a standalone function + returnType : :class:`pyspark.sql.types.DataType` or str, optional + the return type of the user-defined function. The value can be either a + :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. + functionType : int, optional + an enum value in :class:`pyspark.sql.functions.ArrowUDFType`. + Default: SCALAR. This parameter exists for compatibility. + Using Python type hints is encouraged. + + Examples + -------- + In order to use this API, customarily the below are imported: + + >>> import pyarrow as pa + >>> from pyspark.sql.functions import arrow_udf + + `Python type hints <https://www.python.org/dev/peps/pep-0484>`_ + detect the function types as below: + + >>> from pyspark.sql.functions import ArrowUDFType + >>> from pyspark.sql.types import IntegerType + >>> @arrow_udf(IntegerType()) + ... def slen(v: pa.Array) -> pa.Array: + ... return pa.compute.utf8_length(v) + + Note that the type hint should use `pyarrow.Array` in all cases. + + * Arrays to Arrays + `pyarrow.Array`, ... -> `pyarrow.Array` + + The function takes one or more `pyarrow.Array` and outputs one `pyarrow.Array`. + The output of the function should always be of the same length as the input. + + >>> @arrow_udf("string") + ... def to_upper(s: pa.Array) -> pa.Array: + ... return pa.compute.ascii_upper(s) + ... + >>> df = spark.createDataFrame([("John Doe",)], ("name",)) + >>> df.select(to_upper("name")).show() + +--------------+ + |to_upper(name)| + +--------------+ + | JOHN DOE| + +--------------+ + + >>> @arrow_udf("first string, last string") + ... def split_expand(v: pa.Array) -> pa.Array: + ... b = pa.compute.ascii_split_whitespace(v) + ... s0 = pa.array([t[0] for t in b]) + ... s1 = pa.array([t[1] for t in b]) + ... return pa.StructArray.from_arrays([s0, s1], names=["first", "last"]) + ... + >>> df = spark.createDataFrame([("John Doe",)], ("name",)) + >>> df.select(split_expand("name")).show() + +------------------+ + |split_expand(name)| + +------------------+ + | {John, Doe}| + +------------------+ + + This type of Pandas UDF can use keyword arguments: + + >>> from pyspark.sql.functions import col + >>> @arrow_udf(returnType=IntegerType()) + ... def calc(a: pa.Array, b: pa.Array) -> pa.Array: + ... return pa.compute.add(a, pa.compute.multiply(b, 10)) + ... + >>> spark.range(2).select(calc(b=col("id") * 10, a=col("id"))).show() + +-----------------------------+ + |calc(b => (id * 10), a => id)| + +-----------------------------+ + | 0| + | 101| + +-----------------------------+ + + .. note:: The length of the input is not that of the whole input column, but is the + length of an internal batch used for each call to the function. + + * Iterator of Arrays to Iterator of Arrays + `Iterator[pyarrow.Array]` -> `Iterator[pyarrow.Array]` + + The function takes an iterator of `pyarrow.Array` and outputs an iterator of + `pyarrow.Array`. In this case, the created arrow UDF instance requires one input + column when this is called as a PySpark column. The length of the entire output from + the function should be the same length of the entire input; therefore, it can + prefetch the data from the input iterator as long as the lengths are the same. + + It is also useful when the UDF execution + requires initializing some states, although internally it works identically as + Arrays to Arrays case. The pseudocode below illustrates the example. + + .. highlight:: python + .. code-block:: python + + @arrow_udf("long") + def calculate(iterator: Iterator[pa.Array]) -> Iterator[pa.Array]: + # Do some expensive initialization with a state + state = very_expensive_initialization() + for x in iterator: + # Use that state for whole iterator. + yield calculate_with_state(x, state) + + df.select(calculate("value")).show() + + >>> import pandas as pd + >>> from typing import Iterator + >>> @arrow_udf("long") + ... def plus_one(iterator: Iterator[pa.Array]) -> Iterator[pa.Array]: + ... for v in iterator: + ... yield pa.compute.add(v, 1) + ... + >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"])) + >>> df.select(plus_one(df.v)).show() + +-----------+ + |plus_one(v)| + +-----------+ + | 2| + | 3| + | 4| + +-----------+ + + .. note:: The length of each series is the length of a batch internally used. + + * Iterator of Multiple Arrays to Iterator of Arrays + `Iterator[Tuple[pyarrow.Array, ...]]` -> `Iterator[pyarrow.Array]` + + The function takes an iterator of a tuple of multiple `pyarrow.Array` and outputs an + iterator of `pyarrow.Array`. In this case, the created arrow UDF instance requires + input columns as many as the series when this is called as a PySpark column. + Otherwise, it has the same characteristics and restrictions as Iterator of Arrays + to Iterator of Arrays case. + + >>> from typing import Iterator, Tuple + >>> from pyspark.sql.functions import struct, col + >>> @arrow_udf("long") + ... def multiply(iterator: Iterator[Tuple[pa.Array, pa.Array]]) -> Iterator[pa.Array]: + ... for v1, v2 in iterator: + ... yield pa.compute.multiply(v1, v2.field("v")) + ... + >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"])) + >>> df.withColumn('output', multiply(col("v"), struct(col("v")))).show() + +---+------+ + | v|output| + +---+------+ + | 1| 1| + | 2| 4| + | 3| 9| + +---+------+ + + .. note:: The length of each series is the length of a batch internally used. + + * Arrays to Scalar + `pyarrow.Array`, ... -> `Any` + + The function takes `pyarrow.Array` and returns a scalar value. The returned scalar + can be a python primitive type, (e.g., int or float), a numpy data type (e.g., + numpy.int64 or numpy.float64), or a pyarrow.Scalar instance which supports complex + return types. + `Any` should ideally be a specific scalar type accordingly. + + >>> @arrow_udf("double") + ... def mean_udf(v: pa.Array) -> float: + ... return pa.compute.mean(v).as_py() + ... + >>> df = spark.createDataFrame( + ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) + >>> df.groupby("id").agg(mean_udf(df['v'])).show() + +---+-----------+ + | id|mean_udf(v)| + +---+-----------+ + | 1| 1.5| + | 2| 6.0| + +---+-----------+ + + The retun type can also be a complex type such as struct, list, or map. + >>> @arrow_udf("struct<m1: double, m2: double>") + ... def min_max_udf(v: pa.Array) -> pa.Scalar: + ... m1 = pa.compute.min(v) + ... m2 = pa.compute.max(v) + ... t = pa.struct([pa.field("m1", pa.float64()), pa.field("m2", pa.float64())]) + ... return pa.scalar(value={"m1": m1.as_py(), "m2": m2.as_py()}, type=t) + ... + >>> df = spark.createDataFrame( + ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) + >>> df.groupby("id").agg(min_max_udf(df['v'])).show() + +---+--------------+ + | id|min_max_udf(v)| + +---+--------------+ + | 1| {1.0, 2.0}| + | 2| {3.0, 10.0}| + +---+--------------+ + + This type of Pandas UDF can use keyword arguments: + + >>> @arrow_udf("double") + ... def weighted_mean_udf(v: pa.Array, w: pa.Array) -> float: + ... import numpy as np + ... return np.average(v.to_numpy(), weights=w) + ... + >>> df = spark.createDataFrame( + ... [(1, 1.0, 1.0), (1, 2.0, 2.0), (2, 3.0, 1.0), (2, 5.0, 2.0), (2, 10.0, 3.0)], + ... ("id", "v", "w")) + >>> df.groupby("id").agg(weighted_mean_udf(w=df["w"], v=df["v"])).show() + +---+---------------------------------+ + | id|weighted_mean_udf(w => w, v => v)| + +---+---------------------------------+ + | 1| 1.6666666666666667| + | 2| 7.166666666666667| + +---+---------------------------------+ + + This UDF can also be used as window functions as below: + + >>> from pyspark.sql import Window + >>> @arrow_udf("double") + ... def mean_udf(v: pa.Array) -> float: + ... return pa.compute.mean(v).as_py() + ... + >>> df = spark.createDataFrame( + ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) + >>> w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0) + >>> df.withColumn('mean_v', mean_udf("v").over(w)).show() + +---+----+------+ + | id| v|mean_v| + +---+----+------+ + | 1| 1.0| 1.0| + | 1| 2.0| 1.5| + | 2| 3.0| 3.0| + | 2| 5.0| 4.0| + | 2|10.0| 7.5| + +---+----+------+ + + .. note:: For performance reasons, the input series to window functions are not copied. + Therefore, mutating the input arrays is not allowed and will cause incorrect results. + For the same reason, users should also not rely on the index of the input arrays. + + Notes + ----- + The user-defined functions do not support conditional expressions or short circuiting + in boolean expressions and it ends up with being executed all internally. If the functions + can fail on special rows, the workaround is to incorporate the condition into the functions. + + The user-defined functions do not take keyword arguments on the calling side. + + The data type of returned `pyarrow.Array` from the user-defined functions should be + matched with defined `returnType` (see :meth:`types.to_arrow_type` and + :meth:`types.from_arrow_type`). When there is mismatch between them, Spark might do + conversion on returned data. The conversion is not guaranteed to be correct and results + should be checked for accuracy by users. + + See Also + -------- + pyspark.sql.GroupedData.agg + pyspark.sql.DataFrame.mapInArrow + pyspark.sql.GroupedData.applyInArrow + pyspark.sql.PandasCogroupedOps.applyInArrow + pyspark.sql.UDFRegistration.register + """ return vectorized_udf(f, returnType, functionType, "arrow") diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf.py b/python/pyspark/sql/tests/arrow/test_arrow_udf.py index 052ee1c3be83..e0976e6c8949 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_udf.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_udf.py @@ -18,8 +18,7 @@ import unittest import datetime -# TODO: import arrow_udf from public API -from pyspark.sql.pandas.functions import arrow_udf, ArrowUDFType, PandasUDFType +from pyspark.sql.functions import arrow_udf, ArrowUDFType, PandasUDFType from pyspark.sql import functions as F from pyspark.sql.types import ( DoubleType, diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py b/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py index 19adc77103a9..5a0ed6606f1a 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py @@ -17,7 +17,7 @@ import unittest -from pyspark.sql.pandas.functions import arrow_udf, ArrowUDFType +from pyspark.sql.functions import arrow_udf, ArrowUDFType from pyspark.util import PythonEvalType from pyspark.sql import Row from pyspark.sql import functions as sf diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py index a3210ec8d1f4..97db29ae6590 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py @@ -25,8 +25,7 @@ from typing import Iterator, Tuple from pyspark.util import PythonEvalType -# TODO: import arrow_udf from public API -from pyspark.sql.pandas.functions import arrow_udf, ArrowUDFType +from pyspark.sql.functions import arrow_udf, ArrowUDFType from pyspark.sql import functions as F from pyspark.sql.types import ( IntegerType, diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py b/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py index 9541e798b0b4..42281b9caf49 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_window.py @@ -17,7 +17,7 @@ import unittest -from pyspark.sql.pandas.functions import arrow_udf, ArrowUDFType +from pyspark.sql.functions import arrow_udf, ArrowUDFType from pyspark.util import PythonEvalType from pyspark.sql import functions as sf from pyspark.sql.window import Window --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org