This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a4e456318eae [SPARK-54738][PYTHON] Add Profiler Support for Pandas 
Grouped Iter Aggregate UDF
a4e456318eae is described below

commit a4e456318eae32c16f533d155e6c0f2d7c925818
Author: Yicong-Huang <[email protected]>
AuthorDate: Thu Dec 18 11:37:03 2025 +0800

    [SPARK-54738][PYTHON] Add Profiler Support for Pandas Grouped Iter 
Aggregate UDF
    
    ### What changes were proposed in this pull request?
    
    - `spark.python.profile`: Add `SQL_GROUPED_AGG_PANDAS_ITER_UDF` to the 
profiler warning list in `udf.py` so that when `spark.python.profile` is 
enabled, users will see appropriate warnings consistent with other 
iterator-based UDFs.
    - `spark.sql.pyspark.udf.profiler`: No changes needed. This UDF type 
already works correctly because it returns scalar (not iterator), so it uses 
the non-iterator profiler branch in `wrap_perf_profiler` and 
`wrap_memory_profiler`.
    
    ### Why are the changes needed?
    
    To make profilers support for `SQL_GROUPED_AGG_PANDAS_ITER_UDF` consistent 
with other UDFs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. When users enable `spark.python.profile` with 
`SQL_GROUPED_AGG_PANDAS_ITER_UDF`, they will now see a warning message 
consistent with other iterator-based UDFs.
    
    ### How was this patch tested?
    
    Added a test case `test_perf_profiler_pandas_udf_grouped_agg_iter` to 
verify that `spark.sql.pyspark.udf.profiler` works correctly with this UDF 
type. Also verified that the `spark.python.profile` profiler warning is 
triggered correctly in `test_unsupported`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53511 from 
Yicong-Huang/SPARK-54738/feat/add-profiler-support-for-grouped-iter-agg-udf.
    
    Authored-by: Yicong-Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/tests/test_udf_profiler.py | 27 +++++++++++++++++++++++++++
 python/pyspark/sql/udf.py                     |  1 +
 2 files changed, 28 insertions(+)

diff --git a/python/pyspark/sql/tests/test_udf_profiler.py 
b/python/pyspark/sql/tests/test_udf_profiler.py
index 4d565ecfd939..09ae6baaee57 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -537,6 +537,33 @@ class UDFProfiler2TestsMixin:
         for id in self.profile_results:
             self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_pandas_udf_grouped_agg_iter(self):
+        import pandas as pd
+
+        @pandas_udf("double")
+        def pandas_mean_iter(it: Iterator[pd.Series]) -> float:
+            sum_val = 0.0
+            cnt = 0
+            for v in it:
+                sum_val += v.sum()
+                cnt += len(v)
+            return sum_val / cnt if cnt > 0 else 0.0
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df = self.spark.createDataFrame(
+                [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v")
+            )
+            df.groupBy(df.id).agg(pandas_mean_iter(df["v"])).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            self.assert_udf_profile_present(udf_id=id, 
expected_line_count_prefix=2)
+
     @unittest.skipIf(
         not have_pandas or not have_pyarrow,
         cast(str, pandas_requirement_message or pyarrow_requirement_message),
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 34c69d1ffda2..61f65d5243b7 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -439,6 +439,7 @@ class UserDefinedFunction:
                 PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
                 PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
                 PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
+                PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF,
             ]:
                 warnings.warn(
                     "Profiling UDFs with iterators input/output is not 
supported.",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to