This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fd5f1c73c3e [SPARK-54631][PYTHON] Add profiler support for Arrow 
Grouped Iter Aggregate UDF
8fd5f1c73c3e is described below

commit 8fd5f1c73c3e4cb393c5875a7244d5211dddb32e
Author: Yicong-Huang <[email protected]>
AuthorDate: Mon Dec 8 07:36:50 2025 +0900

    [SPARK-54631][PYTHON] Add profiler support for Arrow Grouped Iter Aggregate 
UDF
    
    ### What changes were proposed in this pull request?
    
    - `spark.python.profile`: Add `SQL_GROUPED_AGG_ARROW_ITER_UDF` to the 
profiler warning list in `udf.py` so that when `spark.python.profile` is 
enabled, users will see appropriate warnings consistent with other 
iterator-based UDFs.
    - `spark.sql.pyspark.udf.profiler`: No changes needed. This UDF type 
already works correctly because it returns scalar (not iterator), so it uses 
the non-iterator profiler branch in `wrap_perf_profiler` and 
`wrap_memory_profiler`.
    
    ### Why are the changes needed?
    
    To make profilers support for `SQL_GROUPED_AGG_ARROW_ITER_UDF` consistent 
with other UDFs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. When users enable `spark.python.profile` with 
`SQL_GROUPED_AGG_ARROW_ITER_UDF`, they will now see a warning message 
consistent with other iterator-based UDFs.
    
    ### How was this patch tested?
    
    Added a test case `test_perf_profiler_arrow_udf_grouped_agg_iter` to verify 
that `spark.sql.pyspark.udf.profiler` works correctly with this UDF type. Also 
verified that the `spark.python.profile` profiler warning is triggered 
correctly in `test_unsupported`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53374 from 
Yicong-Huang/SPARK-54631/feat/add-profiler-support-for-arrow-grouped-agg-iter-udf.
    
    Authored-by: Yicong-Huang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/tests/test_udf_profiler.py | 51 +++++++++++++++++++++++++++
 python/pyspark/sql/udf.py                     |  1 +
 2 files changed, 52 insertions(+)

diff --git a/python/pyspark/sql/tests/test_udf_profiler.py 
b/python/pyspark/sql/tests/test_udf_profiler.py
index e6a7bf40b945..4d565ecfd939 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -138,6 +138,23 @@ class UDFProfilerTests(unittest.TestCase):
 
         self.spark.range(10).select(iter_to_iter("id")).collect()
 
+    def exec_arrow_udf_grouped_agg_iter(self):
+        import pyarrow as pa
+
+        @arrow_udf("double")
+        def arrow_mean_iter(it: Iterator[pa.Array]) -> float:
+            sum_val = 0.0
+            cnt = 0
+            for v in it:
+                sum_val += pa.compute.sum(v).as_py()
+                cnt += len(v)
+            return sum_val / cnt if cnt > 0 else 0.0
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+        df.groupby("id").agg(arrow_mean_iter(df["v"])).collect()
+
     # Unsupported
     def exec_map(self):
         import pandas as pd
@@ -169,6 +186,15 @@ class UDFProfilerTests(unittest.TestCase):
                 "Profiling UDFs with iterators input/output is not supported" 
in str(user_warns[0])
             )
 
+        with warnings.catch_warnings(record=True) as warns:
+            warnings.simplefilter("always")
+            self.exec_arrow_udf_grouped_agg_iter()
+            user_warns = [warn.message for warn in warns if 
isinstance(warn.message, UserWarning)]
+            self.assertTrue(len(user_warns) > 0)
+            self.assertTrue(
+                "Profiling UDFs with iterators input/output is not supported" 
in str(user_warns[0])
+            )
+
         with warnings.catch_warnings(record=True) as warns:
             warnings.simplefilter("always")
             self.exec_map()
@@ -486,6 +512,31 @@ class UDFProfiler2TestsMixin:
         for id in self.profile_results:
             self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
 
+    @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
+    def test_perf_profiler_arrow_udf_grouped_agg_iter(self):
+        import pyarrow as pa
+        from typing import Iterator
+
+        @arrow_udf("double")
+        def arrow_mean_iter(it: Iterator[pa.Array]) -> float:
+            sum_val = 0.0
+            cnt = 0
+            for v in it:
+                sum_val += pa.compute.sum(v).as_py()
+                cnt += len(v)
+            return sum_val / cnt if cnt > 0 else 0.0
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df = self.spark.createDataFrame(
+                [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v")
+            )
+            df.groupBy(df.id).agg(arrow_mean_iter(df["v"])).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
+
     @unittest.skipIf(
         not have_pandas or not have_pyarrow,
         cast(str, pandas_requirement_message or pyarrow_requirement_message),
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index c7471d19f7d6..75bcb66efdb8 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -438,6 +438,7 @@ class UserDefinedFunction:
                 PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
                 PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
                 PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
+                PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
             ]:
                 warnings.warn(
                     "Profiling UDFs with iterators input/output is not 
supported.",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to