This is an automated email from the ASF dual-hosted git repository.
xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1a66c8c78a46 [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2
profiling in group/cogroup applyInPandas/applyInArrow
1a66c8c78a46 is described below
commit 1a66c8c78a468a5bdc6c033e8c7a26693e4bf62e
Author: Xinrong Meng <[email protected]>
AuthorDate: Thu Feb 8 10:56:28 2024 -0800
[SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in
group/cogroup applyInPandas/applyInArrow
### What changes were proposed in this pull request?
Support v2 (perf, memory) profiling in group/cogroup
applyInPandas/applyInArrow, which rely on physical plan nodes
FlatMapGroupsInBatchExec and FlatMapCoGroupsInBatchExec.
### Why are the changes needed?
Complete v2 profiling support.
### Does this PR introduce _any_ user-facing change?
Yes. V2 profiling in group/cogroup applyInPandas/applyInArrow is supported.
### How was this patch tested?
Unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45050 from xinrong-meng/other_p2.
Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Xinrong Meng <[email protected]>
---
python/pyspark/sql/tests/test_udf_profiler.py | 123 +++++++++++++++++++++
python/pyspark/tests/test_memory_profiler.py | 123 +++++++++++++++++++++
.../python/FlatMapCoGroupsInBatchExec.scala | 2 +-
.../python/FlatMapGroupsInBatchExec.scala | 2 +-
4 files changed, 248 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/sql/tests/test_udf_profiler.py
b/python/pyspark/sql/tests/test_udf_profiler.py
index 99719b5475c1..4f767d274414 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -394,6 +394,129 @@ class UDFProfiler2TestsMixin:
io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
)
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_group_apply_in_pandas(self):
+ # FlatMapGroupsInBatchExec
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+
+ def normalize(pdf):
+ v = pdf.v
+ return pdf.assign(v=(v - v.mean()) / v.std())
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df.groupby("id").applyInPandas(normalize, schema="id long, v
double").show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_cogroup_apply_in_pandas(self):
+ # FlatMapCoGroupsInBatchExec
+ import pandas as pd
+
+ df1 = self.spark.createDataFrame(
+ [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0),
(20000102, 2, 4.0)],
+ ("time", "id", "v1"),
+ )
+ df2 = self.spark.createDataFrame(
+ [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+ )
+
+ def asof_join(left, right):
+ return pd.merge_asof(left, right, on="time", by="id")
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+ asof_join, schema="time int, id int, v1 double, v2 string"
+ ).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_group_apply_in_arrow(self):
+ # FlatMapGroupsInBatchExec
+ import pyarrow.compute as pc
+
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+
+ def normalize(table):
+ v = table.column("v")
+ norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+ return table.set_column(1, "v", norm)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df.groupby("id").applyInArrow(normalize, schema="id long, v
double").show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_cogroup_apply_in_arrow(self):
+ import pyarrow as pa
+
+ df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2,
4.0)], ("id", "v1"))
+ df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+ def summarize(left, right):
+ return pa.Table.from_pydict({"left": [left.num_rows], "right":
[right.num_rows]})
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow(
+ summarize, schema="left long, right long"
+ ).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
diff --git a/python/pyspark/tests/test_memory_profiler.py
b/python/pyspark/tests/test_memory_profiler.py
index 685a5890200e..536f38679c3e 100644
--- a/python/pyspark/tests/test_memory_profiler.py
+++ b/python/pyspark/tests/test_memory_profiler.py
@@ -441,6 +441,129 @@ class MemoryProfiler2TestsMixin:
io.getvalue(),
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
)
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_memory_profiler_group_apply_in_pandas(self):
+ # FlatMapGroupsInBatchExec
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+
+ def normalize(pdf):
+ v = pdf.v
+ return pdf.assign(v=(v - v.mean()) / v.std())
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ df.groupby("id").applyInPandas(normalize, schema="id long, v
double").show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showMemoryProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_memory_profiler_cogroup_apply_in_pandas(self):
+ # FlatMapCoGroupsInBatchExec
+ import pandas as pd
+
+ df1 = self.spark.createDataFrame(
+ [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0),
(20000102, 2, 4.0)],
+ ("time", "id", "v1"),
+ )
+ df2 = self.spark.createDataFrame(
+ [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+ )
+
+ def asof_join(left, right):
+ return pd.merge_asof(left, right, on="time", by="id")
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+ asof_join, schema="time int, id int, v1 double, v2 string"
+ ).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showMemoryProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_memory_profiler_group_apply_in_arrow(self):
+ # FlatMapGroupsInBatchExec
+ import pyarrow.compute as pc
+
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+
+ def normalize(table):
+ v = table.column("v")
+ norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+ return table.set_column(1, "v", norm)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ df.groupby("id").applyInArrow(normalize, schema="id long, v
double").show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showMemoryProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_memory_profiler_cogroup_apply_in_arrow(self):
+ import pyarrow as pa
+
+ df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2,
4.0)], ("id", "v1"))
+ df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+ def summarize(left, right):
+ return pa.Table.from_pydict({"left": [left.num_rows], "right":
[right.num_rows]})
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow(
+ summarize, schema="left long, right long"
+ ).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showMemoryProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
index bc6f9859ec28..66ed2bca7677 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
@@ -87,7 +87,7 @@ trait FlatMapCoGroupsInBatchExec extends SparkPlan with
BinaryExecNode with Pyth
pythonRunnerConf,
pythonMetrics,
jobArtifactUUID,
- None) // TODO(SPARK-46690): Support profiling on
FlatMapCoGroupsInBatchExec
+ conf.pythonUDFProfiler)
executePython(data, output, runner)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
index 580ef46e842d..e0508483b5a1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
@@ -92,7 +92,7 @@ trait FlatMapGroupsInBatchExec extends SparkPlan with
UnaryExecNode with PythonS
pythonRunnerConf,
pythonMetrics,
jobArtifactUUID,
- None) // TODO(SPARK-46689): Support profiling on
FlatMapGroupsInBatchExec
+ conf.pythonUDFProfiler)
executePython(data, output, runner)
}}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]