This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9bd0d7c3ee13 [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2
profiling in aggregate Pandas UDFs
9bd0d7c3ee13 is described below
commit 9bd0d7c3ee135036f5b370ff37517ae9d4d9f155
Author: Xinrong Meng <[email protected]>
AuthorDate: Wed Feb 7 13:50:09 2024 -0800
[SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in
aggregate Pandas UDFs
### What changes were proposed in this pull request?
Support v2 (perf, memory) profiling in Aggregate (Series to Scalar) Pandas
UDFs, which rely on physical plan nodes AggregateInPandasExec and
WindowInPandasExec.
### Why are the changes needed?
Complete v2 profiling support.
### Does this PR introduce _any_ user-facing change?
Yes. V2 profiling in Aggregate Pandas UDFs is supported.
### How was this patch tested?
Unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45035 from xinrong-meng/other_p.
Lead-authored-by: Xinrong Meng <[email protected]>
Co-authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
---
python/pyspark/sql/tests/test_udf_profiler.py | 61 ++++++++++++++++++++++
python/pyspark/tests/test_memory_profiler.py | 61 ++++++++++++++++++++++
.../execution/python/AggregateInPandasExec.scala | 3 +-
.../sql/execution/python/WindowInPandasExec.scala | 2 +-
4 files changed, 124 insertions(+), 3 deletions(-)
diff --git a/python/pyspark/sql/tests/test_udf_profiler.py
b/python/pyspark/sql/tests/test_udf_profiler.py
index 7e3d8e2dbe55..99719b5475c1 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -28,6 +28,7 @@ from typing import Iterator, cast
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, udf
+from pyspark.sql.window import Window
from pyspark.profiler import UDFBasicProfiler
from pyspark.testing.sqlutils import (
ReusedSQLTestCase,
@@ -333,6 +334,66 @@ class UDFProfiler2TestsMixin:
self.assertEqual(0, len(self.profile_results),
str(self.profile_results.keys()))
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_pandas_udf_window(self):
+ # WindowInPandasExec
+ import pandas as pd
+
+ @pandas_udf("double")
+ def mean_udf(v: pd.Series) -> float:
+ return v.mean()
+
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+ w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df.withColumn("mean_v", mean_udf("v").over(w)).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"5.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_aggregate_in_pandas(self):
+ # AggregateInPandasExec
+ import pandas as pd
+
+ @pandas_udf("double")
+ def min_udf(v: pd.Series) -> float:
+ return v.min()
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df = self.spark.createDataFrame(
+ [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age",
"name"]
+ )
+ df.groupBy(df.name).agg(min_udf(df.age)).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
diff --git a/python/pyspark/tests/test_memory_profiler.py
b/python/pyspark/tests/test_memory_profiler.py
index ae9aa24d1c4f..685a5890200e 100644
--- a/python/pyspark/tests/test_memory_profiler.py
+++ b/python/pyspark/tests/test_memory_profiler.py
@@ -30,6 +30,7 @@ from pyspark import SparkConf, SparkContext
from pyspark.profiler import has_memory_profiler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, udf
+from pyspark.sql.window import Window
from pyspark.testing.sqlutils import (
have_pandas,
have_pyarrow,
@@ -380,6 +381,66 @@ class MemoryProfiler2TestsMixin:
self.assertEqual(0, len(self.profile_results),
str(self.profile_results.keys()))
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_memory_profiler_pandas_udf_window(self):
+ # WindowInPandasExec
+ import pandas as pd
+
+ @pandas_udf("double")
+ def mean_udf(v: pd.Series) -> float:
+ return v.mean()
+
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+ w = Window.partitionBy("id").orderBy("v").rowsBetween(-1, 0)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ df.withColumn("mean_v", mean_udf("v").over(w)).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showMemoryProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_memory_profiler_aggregate_in_pandas(self):
+ # AggregateInPandasExec
+ import pandas as pd
+
+ @pandas_udf("double")
+ def min_udf(v: pd.Series) -> float:
+ return v.min()
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ df = self.spark.createDataFrame(
+ [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age",
"name"]
+ )
+ df.groupBy(df.name).agg(min_udf(df.age)).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showMemoryProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 876373177447..26871b68dde8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -181,8 +181,7 @@ case class AggregateInPandasExec(
pythonRunnerConf,
pythonMetrics,
jobArtifactUUID,
- None) // TODO(SPARK-46688): Support profiling on AggregateInPandasExec
- .compute(projectedRowIter, context.partitionId(), context)
+ conf.pythonUDFProfiler).compute(projectedRowIter,
context.partitionId(), context)
val joinedAttributes =
groupingExpressions.map(_.toAttribute) ++
aggExpressions.map(_.resultAttribute)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index c0a38eadbe64..294bcdadc2b2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -88,7 +88,7 @@ case class WindowInPandasExec(
child.output,
longMetric("spillSize"),
pythonMetrics,
- None) // TODO(SPARK-46691): Support profiling on WindowInPandasExec
+ conf.pythonUDFProfiler)
// Start processing.
if (conf.usePartitionEvaluator) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]