This is an automated email from the ASF dual-hosted git repository.

xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a66c8c78a46 [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 
profiling in group/cogroup applyInPandas/applyInArrow
1a66c8c78a46 is described below

commit 1a66c8c78a468a5bdc6c033e8c7a26693e4bf62e
Author: Xinrong Meng <xinr...@apache.org>
AuthorDate: Thu Feb 8 10:56:28 2024 -0800

    [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in 
group/cogroup applyInPandas/applyInArrow
    
    ### What changes were proposed in this pull request?
    Support v2 (perf, memory) profiling in group/cogroup 
applyInPandas/applyInArrow, which rely on physical plan nodes 
FlatMapGroupsInBatchExec and FlatMapCoGroupsInBatchExec.
    
    ### Why are the changes needed?
    Complete v2 profiling support.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. V2 profiling in group/cogroup applyInPandas/applyInArrow is supported.
    
    ### How was this patch tested?
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45050 from xinrong-meng/other_p2.
    
    Authored-by: Xinrong Meng <xinr...@apache.org>
    Signed-off-by: Xinrong Meng <xinr...@apache.org>
---
 python/pyspark/sql/tests/test_udf_profiler.py      | 123 +++++++++++++++++++++
 python/pyspark/tests/test_memory_profiler.py       | 123 +++++++++++++++++++++
 .../python/FlatMapCoGroupsInBatchExec.scala        |   2 +-
 .../python/FlatMapGroupsInBatchExec.scala          |   2 +-
 4 files changed, 248 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/tests/test_udf_profiler.py 
b/python/pyspark/sql/tests/test_udf_profiler.py
index 99719b5475c1..4f767d274414 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -394,6 +394,129 @@ class UDFProfiler2TestsMixin:
                 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_pandas(self):
+        # FlatMapCoGroupsInBatchExec
+        import pandas as pd
+
+        df1 = self.spark.createDataFrame(
+            [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), 
(20000102, 2, 4.0)],
+            ("time", "id", "v1"),
+        )
+        df2 = self.spark.createDataFrame(
+            [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+        )
+
+        def asof_join(left, right):
+            return pd.merge_asof(left, right, on="time", by="id")
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+                asof_join, schema="time int, id int, v1 double, v2 string"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_arrow(self):
+        # FlatMapGroupsInBatchExec
+        import pyarrow.compute as pc
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(table):
+            v = table.column("v")
+            norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+            return table.set_column(1, "v", norm)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_arrow(self):
+        import pyarrow as pa
+
+        df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+        df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+        def summarize(left, right):
+            return pa.Table.from_pydict({"left": [left.num_rows], "right": 
[right.num_rows]})
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow(
+                summarize, schema="left long, right long"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
 
 class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
     def setUp(self) -> None:
diff --git a/python/pyspark/tests/test_memory_profiler.py 
b/python/pyspark/tests/test_memory_profiler.py
index 685a5890200e..536f38679c3e 100644
--- a/python/pyspark/tests/test_memory_profiler.py
+++ b/python/pyspark/tests/test_memory_profiler.py
@@ -441,6 +441,129 @@ class MemoryProfiler2TestsMixin:
                 io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_cogroup_apply_in_pandas(self):
+        # FlatMapCoGroupsInBatchExec
+        import pandas as pd
+
+        df1 = self.spark.createDataFrame(
+            [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), 
(20000102, 2, 4.0)],
+            ("time", "id", "v1"),
+        )
+        df2 = self.spark.createDataFrame(
+            [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+        )
+
+        def asof_join(left, right):
+            return pd.merge_asof(left, right, on="time", by="id")
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+                asof_join, schema="time int, id int, v1 double, v2 string"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_group_apply_in_arrow(self):
+        # FlatMapGroupsInBatchExec
+        import pyarrow.compute as pc
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(table):
+            v = table.column("v")
+            norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+            return table.set_column(1, "v", norm)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_cogroup_apply_in_arrow(self):
+        import pyarrow as pa
+
+        df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+        df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+        def summarize(left, right):
+            return pa.Table.from_pydict({"left": [left.num_rows], "right": 
[right.num_rows]})
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInArrow(
+                summarize, schema="left long, right long"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
 
 class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
     def setUp(self) -> None:
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
index bc6f9859ec28..66ed2bca7677 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
@@ -87,7 +87,7 @@ trait FlatMapCoGroupsInBatchExec extends SparkPlan with 
BinaryExecNode with Pyth
           pythonRunnerConf,
           pythonMetrics,
           jobArtifactUUID,
-          None) // TODO(SPARK-46690): Support profiling on 
FlatMapCoGroupsInBatchExec
+          conf.pythonUDFProfiler)
 
         executePython(data, output, runner)
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
index 580ef46e842d..e0508483b5a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
@@ -92,7 +92,7 @@ trait FlatMapGroupsInBatchExec extends SparkPlan with 
UnaryExecNode with PythonS
         pythonRunnerConf,
         pythonMetrics,
         jobArtifactUUID,
-        None) // TODO(SPARK-46689): Support profiling on 
FlatMapGroupsInBatchExec
+        conf.pythonUDFProfiler)
 
       executePython(data, output, runner)
     }}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to