This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8fd5f1c73c3e [SPARK-54631][PYTHON] Add profiler support for Arrow
Grouped Iter Aggregate UDF
8fd5f1c73c3e is described below
commit 8fd5f1c73c3e4cb393c5875a7244d5211dddb32e
Author: Yicong-Huang <[email protected]>
AuthorDate: Mon Dec 8 07:36:50 2025 +0900
[SPARK-54631][PYTHON] Add profiler support for Arrow Grouped Iter Aggregate
UDF
### What changes were proposed in this pull request?
- `spark.python.profile`: Add `SQL_GROUPED_AGG_ARROW_ITER_UDF` to the
profiler warning list in `udf.py` so that when `spark.python.profile` is
enabled, users will see appropriate warnings consistent with other
iterator-based UDFs.
- `spark.sql.pyspark.udf.profiler`: No changes needed. This UDF type
already works correctly because it returns scalar (not iterator), so it uses
the non-iterator profiler branch in `wrap_perf_profiler` and
`wrap_memory_profiler`.
### Why are the changes needed?
To make profilers support for `SQL_GROUPED_AGG_ARROW_ITER_UDF` consistent
with other UDFs.
### Does this PR introduce _any_ user-facing change?
Yes. When users enable `spark.python.profile` with
`SQL_GROUPED_AGG_ARROW_ITER_UDF`, they will now see a warning message
consistent with other iterator-based UDFs.
### How was this patch tested?
Added a test case `test_perf_profiler_arrow_udf_grouped_agg_iter` to verify
that `spark.sql.pyspark.udf.profiler` works correctly with this UDF type. Also
verified that the `spark.python.profile` profiler warning is triggered
correctly in `test_unsupported`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53374 from
Yicong-Huang/SPARK-54631/feat/add-profiler-support-for-arrow-grouped-agg-iter-udf.
Authored-by: Yicong-Huang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/tests/test_udf_profiler.py | 51 +++++++++++++++++++++++++++
python/pyspark/sql/udf.py | 1 +
2 files changed, 52 insertions(+)
diff --git a/python/pyspark/sql/tests/test_udf_profiler.py
b/python/pyspark/sql/tests/test_udf_profiler.py
index e6a7bf40b945..4d565ecfd939 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -138,6 +138,23 @@ class UDFProfilerTests(unittest.TestCase):
self.spark.range(10).select(iter_to_iter("id")).collect()
+ def exec_arrow_udf_grouped_agg_iter(self):
+ import pyarrow as pa
+
+ @arrow_udf("double")
+ def arrow_mean_iter(it: Iterator[pa.Array]) -> float:
+ sum_val = 0.0
+ cnt = 0
+ for v in it:
+ sum_val += pa.compute.sum(v).as_py()
+ cnt += len(v)
+ return sum_val / cnt if cnt > 0 else 0.0
+
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+ df.groupby("id").agg(arrow_mean_iter(df["v"])).collect()
+
# Unsupported
def exec_map(self):
import pandas as pd
@@ -169,6 +186,15 @@ class UDFProfilerTests(unittest.TestCase):
"Profiling UDFs with iterators input/output is not supported"
in str(user_warns[0])
)
+ with warnings.catch_warnings(record=True) as warns:
+ warnings.simplefilter("always")
+ self.exec_arrow_udf_grouped_agg_iter()
+ user_warns = [warn.message for warn in warns if
isinstance(warn.message, UserWarning)]
+ self.assertTrue(len(user_warns) > 0)
+ self.assertTrue(
+ "Profiling UDFs with iterators input/output is not supported"
in str(user_warns[0])
+ )
+
with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter("always")
self.exec_map()
@@ -486,6 +512,31 @@ class UDFProfiler2TestsMixin:
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=2)
+ @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+ def test_perf_profiler_arrow_udf_grouped_agg_iter(self):
+ import pyarrow as pa
+ from typing import Iterator
+
+ @arrow_udf("double")
+ def arrow_mean_iter(it: Iterator[pa.Array]) -> float:
+ sum_val = 0.0
+ cnt = 0
+ for v in it:
+ sum_val += pa.compute.sum(v).as_py()
+ cnt += len(v)
+ return sum_val / cnt if cnt > 0 else 0.0
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id",
"v")
+ )
+ df.groupBy(df.id).agg(arrow_mean_iter(df["v"])).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ self.assert_udf_profile_present(udf_id=id,
expected_line_count_prefix=2)
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index c7471d19f7d6..75bcb66efdb8 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -438,6 +438,7 @@ class UserDefinedFunction:
PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
+ PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
]:
warnings.warn(
"Profiling UDFs with iterators input/output is not
supported.",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]