This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd0d7c3ee13 [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 
profiling in aggregate Pandas UDFs
9bd0d7c3ee13 is described below

commit 9bd0d7c3ee135036f5b370ff37517ae9d4d9f155
Author: Xinrong Meng <xinr...@apache.org>
AuthorDate: Wed Feb 7 13:50:09 2024 -0800

    [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in 
aggregate Pandas UDFs
    
    ### What changes were proposed in this pull request?
    Support v2 (perf, memory) profiling in Aggregate (Series to Scalar) Pandas 
UDFs, which rely on physical plan nodes AggregateInPandasExec and 
WindowInPandasExec.
    
    ### Why are the changes needed?
    Complete v2 profiling support.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. V2 profiling in Aggregate Pandas UDFs is supported.
    
    ### How was this patch tested?
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45035 from xinrong-meng/other_p.
    
    Lead-authored-by: Xinrong Meng <xinr...@apache.org>
    Co-authored-by: Takuya UESHIN <ues...@databricks.com>
    Signed-off-by: Takuya UESHIN <ues...@databricks.com>
---
 python/pyspark/sql/tests/test_udf_profiler.py      | 61 ++++++++++++++++++++++
 python/pyspark/tests/test_memory_profiler.py       | 61 ++++++++++++++++++++++
 .../execution/python/AggregateInPandasExec.scala   |  3 +-
 .../sql/execution/python/WindowInPandasExec.scala  |  2 +-
 4 files changed, 124 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/tests/test_udf_profiler.py 
b/python/pyspark/sql/tests/test_udf_profiler.py
index 7e3d8e2dbe55..99719b5475c1 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -28,6 +28,7 @@ from typing import Iterator, cast
 from pyspark import SparkConf
 from pyspark.sql import SparkSession
 from pyspark.sql.functions import col, pandas_udf, udf
+from pyspark.sql.window import Window
 from pyspark.profiler import UDFBasicProfiler
 from pyspark.testing.sqlutils import (
     ReusedSQLTestCase,
@@ -333,6 +334,66 @@ class UDFProfiler2TestsMixin:
 
         self.assertEqual(0, len(self.profile_results), 
str(self.profile_results.keys()))
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_pandas_udf_window(self):
+        # WindowInPandasExec
+        import pandas as pd
+
+        @pandas_udf("double")
+        def mean_udf(v: pd.Series) -> float:
+            return v.mean()
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+        w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.withColumn("mean_v", mean_udf("v").over(w)).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"5.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_aggregate_in_pandas(self):
+        # AggregateInPandasExec
+        import pandas as pd
+
+        @pandas_udf("double")
+        def min_udf(v: pd.Series) -> float:
+            return v.min()
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df = self.spark.createDataFrame(
+                [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", 
"name"]
+            )
+            df.groupBy(df.name).agg(min_udf(df.age)).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
 
 class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
     def setUp(self) -> None:
diff --git a/python/pyspark/tests/test_memory_profiler.py 
b/python/pyspark/tests/test_memory_profiler.py
index ae9aa24d1c4f..685a5890200e 100644
--- a/python/pyspark/tests/test_memory_profiler.py
+++ b/python/pyspark/tests/test_memory_profiler.py
@@ -30,6 +30,7 @@ from pyspark import SparkConf, SparkContext
 from pyspark.profiler import has_memory_profiler
 from pyspark.sql import SparkSession
 from pyspark.sql.functions import col, pandas_udf, udf
+from pyspark.sql.window import Window
 from pyspark.testing.sqlutils import (
     have_pandas,
     have_pyarrow,
@@ -380,6 +381,66 @@ class MemoryProfiler2TestsMixin:
 
         self.assertEqual(0, len(self.profile_results), 
str(self.profile_results.keys()))
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_pandas_udf_window(self):
+        # WindowInPandasExec
+        import pandas as pd
+
+        @pandas_udf("double")
+        def mean_udf(v: pd.Series) -> float:
+            return v.mean()
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+        w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df.withColumn("mean_v", mean_udf("v").over(w)).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_aggregate_in_pandas(self):
+        # AggregateInPandasExec
+        import pandas as pd
+
+        @pandas_udf("double")
+        def min_udf(v: pd.Series) -> float:
+            return v.min()
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df = self.spark.createDataFrame(
+                [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", 
"name"]
+            )
+            df.groupBy(df.name).agg(min_udf(df.age)).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
 
 class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
     def setUp(self) -> None:
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 876373177447..26871b68dde8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -181,8 +181,7 @@ case class AggregateInPandasExec(
         pythonRunnerConf,
         pythonMetrics,
         jobArtifactUUID,
-        None) // TODO(SPARK-46688): Support profiling on AggregateInPandasExec
-        .compute(projectedRowIter, context.partitionId(), context)
+        conf.pythonUDFProfiler).compute(projectedRowIter, 
context.partitionId(), context)
 
       val joinedAttributes =
         groupingExpressions.map(_.toAttribute) ++ 
aggExpressions.map(_.resultAttribute)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index c0a38eadbe64..294bcdadc2b2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -88,7 +88,7 @@ case class WindowInPandasExec(
         child.output,
         longMetric("spillSize"),
         pythonMetrics,
-        None) // TODO(SPARK-46691): Support profiling on WindowInPandasExec
+        conf.pythonUDFProfiler)
 
     // Start processing.
     if (conf.usePartitionEvaluator) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to