This is an automated email from the ASF dual-hosted git repository. xinrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1a66c8c78a46 [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow 1a66c8c78a46 is described below commit 1a66c8c78a468a5bdc6c033e8c7a26693e4bf62e Author: Xinrong Meng <xinr...@apache.org> AuthorDate: Thu Feb 8 10:56:28 2024 -0800 [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow ### What changes were proposed in this pull request? Support v2 (perf, memory) profiling in group/cogroup applyInPandas/applyInArrow, which rely on physical plan nodes FlatMapGroupsInBatchExec and FlatMapCoGroupsInBatchExec. ### Why are the changes needed? Complete v2 profiling support. ### Does this PR introduce _any_ user-facing change? Yes. V2 profiling in group/cogroup applyInPandas/applyInArrow is supported. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45050 from xinrong-meng/other_p2. Authored-by: Xinrong Meng <xinr...@apache.org> Signed-off-by: Xinrong Meng <xinr...@apache.org> --- python/pyspark/sql/tests/test_udf_profiler.py | 123 +++++++++++++++++++++ python/pyspark/tests/test_memory_profiler.py | 123 +++++++++++++++++++++ .../python/FlatMapCoGroupsInBatchExec.scala | 2 +- .../python/FlatMapGroupsInBatchExec.scala | 2 +- 4 files changed, 248 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 99719b5475c1..4f767d274414 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -394,6 +394,129 @@ class UDFProfiler2TestsMixin: io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" ) + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_perf_profiler_group_apply_in_pandas(self): + # FlatMapGroupsInBatchExec + df = self.spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") + ) + + def normalize(pdf): + v = pdf.v + return pdf.assign(v=(v - v.mean()) / v.std()) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + df.groupby("id").applyInPandas(normalize, schema="id long, v double").show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showPerfProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_perf_profiler_cogroup_apply_in_pandas(self): + # FlatMapCoGroupsInBatchExec + import pandas as pd + + df1 = self.spark.createDataFrame( + [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], + ("time", "id", "v1"), + ) + df2 = self.spark.createDataFrame( + [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2") + ) + + def asof_join(left, right): + return pd.merge_asof(left, right, on="time", by="id") + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( + asof_join, schema="time int, id int, v1 double, v2 string" + ).show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showPerfProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_perf_profiler_group_apply_in_arrow(self): + # FlatMapGroupsInBatchExec + import pyarrow.compute as pc + + df = self.spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") + ) + + def normalize(table): + v = table.column("v") + norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1)) + return table.set_column(1, "v", norm) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + df.groupby("id").applyInArrow(normalize, schema="id long, v double").show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showPerfProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_perf_profiler_cogroup_apply_in_arrow(self): + import pyarrow as pa + + df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1")) + df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2")) + + def summarize(left, right): + return pa.Table.from_pydict({"left": [left.num_rows], "right": [right.num_rows]}) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow( + summarize, schema="left long, right long" + ).show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showPerfProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index 685a5890200e..536f38679c3e 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -441,6 +441,129 @@ class MemoryProfiler2TestsMixin: io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" ) + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_memory_profiler_group_apply_in_pandas(self): + # FlatMapGroupsInBatchExec + df = self.spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") + ) + + def normalize(pdf): + v = pdf.v + return pdf.assign(v=(v - v.mean()) / v.std()) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + df.groupby("id").applyInPandas(normalize, schema="id long, v double").show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showMemoryProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_memory_profiler_cogroup_apply_in_pandas(self): + # FlatMapCoGroupsInBatchExec + import pandas as pd + + df1 = self.spark.createDataFrame( + [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], + ("time", "id", "v1"), + ) + df2 = self.spark.createDataFrame( + [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2") + ) + + def asof_join(left, right): + return pd.merge_asof(left, right, on="time", by="id") + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( + asof_join, schema="time int, id int, v1 double, v2 string" + ).show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showMemoryProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_memory_profiler_group_apply_in_arrow(self): + # FlatMapGroupsInBatchExec + import pyarrow.compute as pc + + df = self.spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") + ) + + def normalize(table): + v = table.column("v") + norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1)) + return table.set_column(1, "v", norm) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + df.groupby("id").applyInArrow(normalize, schema="id long, v double").show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showMemoryProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + + @unittest.skipIf( + not have_pandas or not have_pyarrow, + cast(str, pandas_requirement_message or pyarrow_requirement_message), + ) + def test_memory_profiler_cogroup_apply_in_arrow(self): + import pyarrow as pa + + df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1")) + df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2")) + + def summarize(left, right): + return pa.Table.from_pydict({"left": [left.num_rows], "right": [right.num_rows]}) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow( + summarize, schema="left long, right long" + ).show() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + with self.trap_stdout() as io: + self.spark.showMemoryProfiles(id) + + self.assertIn(f"Profile of UDF<id={id}>", io.getvalue()) + self.assertRegex( + io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" + ) + class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala index bc6f9859ec28..66ed2bca7677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala @@ -87,7 +87,7 @@ trait FlatMapCoGroupsInBatchExec extends SparkPlan with BinaryExecNode with Pyth pythonRunnerConf, pythonMetrics, jobArtifactUUID, - None) // TODO(SPARK-46690): Support profiling on FlatMapCoGroupsInBatchExec + conf.pythonUDFProfiler) executePython(data, output, runner) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala index 580ef46e842d..e0508483b5a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala @@ -92,7 +92,7 @@ trait FlatMapGroupsInBatchExec extends SparkPlan with UnaryExecNode with PythonS pythonRunnerConf, pythonMetrics, jobArtifactUUID, - None) // TODO(SPARK-46689): Support profiling on FlatMapGroupsInBatchExec + conf.pythonUDFProfiler) executePython(data, output, runner) }} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org