This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2a4c1886d8f0 [SPARK-53393][PYTHON] Disable memory profiler for Arrow Scalar Iterator UDFs 2a4c1886d8f0 is described below commit 2a4c1886d8f0b4c0ff7c1cf1d351317947ef84d2 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Wed Aug 27 18:24:27 2025 +0800 [SPARK-53393][PYTHON] Disable memory profiler for Arrow Scalar Iterator UDFs ### What changes were proposed in this pull request? Disable memory profiler for Arrow Scalar Iterator UDFs ### Why are the changes needed? to be consistent with Pandas Scalar Iterator UDFs ### Does this PR introduce _any_ user-facing change? no, the arrow udf is not yet released ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #52138 from zhengruifeng/arrow_udf_iter_profile. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/sql/tests/test_udf_profiler.py | 21 ++++++++++++++++++++- python/pyspark/sql/udf.py | 1 + 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 8c45149605c7..750cc59a93d9 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -28,7 +28,7 @@ from typing import Iterator, cast from pyspark import SparkConf from pyspark.errors import PySparkValueError from pyspark.sql import SparkSession -from pyspark.sql.functions import col, pandas_udf, udf +from pyspark.sql.functions import col, arrow_udf, pandas_udf, udf from pyspark.sql.window import Window from pyspark.profiler import UDFBasicProfiler from pyspark.testing.sqlutils import ReusedSQLTestCase @@ -127,6 +127,16 @@ class UDFProfilerTests(unittest.TestCase): self.spark.range(10).select(iter_to_iter("id")).collect() + def exec_arrow_udf_iter_to_iter(self): + import pyarrow as pa + + @arrow_udf("int") + def iter_to_iter(iter: Iterator[pa.Array]) -> Iterator[pa.Array]: + for s in iter: + yield pa.compute.add(s, 1) + + self.spark.range(10).select(iter_to_iter("id")).collect() + # Unsupported def exec_map(self): import pandas as pd @@ -149,6 +159,15 @@ class UDFProfilerTests(unittest.TestCase): "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) ) + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_arrow_udf_iter_to_iter() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + with warnings.catch_warnings(record=True) as warns: warnings.simplefilter("always") self.exec_map() diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 908b209c7151..8908bc48348d 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -436,6 +436,7 @@ class UserDefinedFunction: # Disable profiling Pandas UDFs with iterators as input/output. if self.evalType in [ PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, + PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF, PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, PythonEvalType.SQL_MAP_ARROW_ITER_UDF, ]: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org