This is an automated email from the ASF dual-hosted git repository.
xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 501999a834ea [SPARK-47276][PYTHON][CONNECT] Introduce
`spark.profile.clear` for SparkSession-based profiling
501999a834ea is described below
commit 501999a834ea7761a792b823c543e40fba84231d
Author: Xinrong Meng <[email protected]>
AuthorDate: Thu Mar 7 13:20:39 2024 -0800
[SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for
SparkSession-based profiling
### What changes were proposed in this pull request?
Introduce `spark.profile.clear` for SparkSession-based profiling.
### Why are the changes needed?
A straightforward and unified interface for managing and resetting
profiling results for SparkSession-based profilers.
### Does this PR introduce _any_ user-facing change?
Yes. `spark.profile.clear` is supported as shown below.
Preparation:
```py
>>> from pyspark.sql.functions import pandas_udf
>>> df = spark.range(3)
>>> pandas_udf("long")
... def add1(x):
... return x + 1
...
>>> added = df.select(add1("id"))
>>> spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
>>> added.show()
+--------+
|add1(id)|
+--------+
...
+--------+
>>> spark.profile.show()
============================================================
Profile of UDF<id=2>
============================================================
1410 function calls (1374 primitive calls) in 0.004 seconds
...
```
Example usage:
```py
>>> spark.profile.profiler_collector._profile_results
{2: (<pstats.Stats object at 0x7ff6484d22e0>, None)}
>>> spark.profile.clear(1) # id mismatch
>>> spark.profile.profiler_collector._profile_results
{2: (<pstats.Stats object at 0x7ff6484d22e0>, None)}
>>> spark.profile.clear(type="memory") # type mismatch
>>> spark.profile.profiler_collector._profile_results
{2: (<pstats.Stats object at 0x7ff6484d22e0>, None)}
>>> spark.profile.clear() # clear all
>>> spark.profile.profiler_collector._profile_results
{}
>>> spark.profile.show()
>>>
```
### How was this patch tested?
Unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45378 from xinrong-meng/profile_clear.
Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Xinrong Meng <[email protected]>
---
python/pyspark/sql/profiler.py | 79 +++++++++++++++++++++++++++
python/pyspark/sql/tests/test_session.py | 27 +++++++++
python/pyspark/sql/tests/test_udf_profiler.py | 26 +++++++++
python/pyspark/tests/test_memory_profiler.py | 59 ++++++++++++++++++++
4 files changed, 191 insertions(+)
diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py
index 5ab27bce2582..711e39de4723 100644
--- a/python/pyspark/sql/profiler.py
+++ b/python/pyspark/sql/profiler.py
@@ -224,6 +224,56 @@ class ProfilerCollector(ABC):
for id in sorted(code_map.keys()):
dump(id)
+ def clear_perf_profiles(self, id: Optional[int] = None) -> None:
+ """
+ Clear the perf profile results.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ id : int, optional
+ The UDF ID whose profiling results should be cleared.
+ If not specified, all the results will be cleared.
+ """
+ with self._lock:
+ if id is not None:
+ if id in self._profile_results:
+ perf, mem, *_ = self._profile_results[id]
+ self._profile_results[id] = (None, mem, *_)
+ if mem is None:
+ self._profile_results.pop(id, None)
+ else:
+ for id, (perf, mem, *_) in list(self._profile_results.items()):
+ self._profile_results[id] = (None, mem, *_)
+ if mem is None:
+ self._profile_results.pop(id, None)
+
+ def clear_memory_profiles(self, id: Optional[int] = None) -> None:
+ """
+ Clear the memory profile results.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ id : int, optional
+ The UDF ID whose profiling results should be cleared.
+ If not specified, all the results will be cleared.
+ """
+ with self._lock:
+ if id is not None:
+ if id in self._profile_results:
+ perf, mem, *_ = self._profile_results[id]
+ self._profile_results[id] = (perf, None, *_)
+ if perf is None:
+ self._profile_results.pop(id, None)
+ else:
+ for id, (perf, mem, *_) in list(self._profile_results.items()):
+ self._profile_results[id] = (perf, None, *_)
+ if perf is None:
+ self._profile_results.pop(id, None)
+
class AccumulatorProfilerCollector(ProfilerCollector):
def __init__(self) -> None:
@@ -309,3 +359,32 @@ class Profile:
"allowed_values": str(["perf", "memory"]),
},
)
+
+ def clear(self, id: Optional[int] = None, *, type: Optional[str] = None)
-> None:
+ """
+ Clear the profile results.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ id : int, optional
+ The UDF ID whose profiling results should be cleared.
+ If not specified, all the results will be cleared.
+ type : str, optional
+ The profiler type to clear results for, which can be either "perf"
or "memory".
+ """
+ if type == "memory":
+ self.profiler_collector.clear_memory_profiles(id)
+ elif type == "perf" or type is None:
+ self.profiler_collector.clear_perf_profiles(id)
+ if type is None: # Clear both perf and memory profiles
+ self.profiler_collector.clear_memory_profiles(id)
+ else:
+ raise PySparkValueError(
+ error_class="VALUE_NOT_ALLOWED",
+ message_parameters={
+ "arg_name": "type",
+ "allowed_values": str(["perf", "memory"]),
+ },
+ )
diff --git a/python/pyspark/sql/tests/test_session.py
b/python/pyspark/sql/tests/test_session.py
index b95e9de9e3f3..5f102d770c6a 100644
--- a/python/pyspark/sql/tests/test_session.py
+++ b/python/pyspark/sql/tests/test_session.py
@@ -531,6 +531,33 @@ class SparkSessionProfileTests(unittest.TestCase,
PySparkErrorTestUtils):
},
)
+ def test_clear_memory_type(self):
+ self.profile.clear(type="memory")
+ self.profiler_collector_mock.clear_memory_profiles.assert_called_once()
+ self.profiler_collector_mock.clear_perf_profiles.assert_not_called()
+
+ def test_clear_perf_type(self):
+ self.profile.clear(type="perf")
+ self.profiler_collector_mock.clear_perf_profiles.assert_called_once()
+ self.profiler_collector_mock.clear_memory_profiles.assert_not_called()
+
+ def test_clear_no_type(self):
+ self.profile.clear()
+ self.profiler_collector_mock.clear_perf_profiles.assert_called_once()
+ self.profiler_collector_mock.clear_memory_profiles.assert_called_once()
+
+ def test_clear_invalid_type(self):
+ with self.assertRaises(PySparkValueError) as e:
+ self.profile.clear(type="invalid")
+ self.check_error(
+ exception=e.exception,
+ error_class="VALUE_NOT_ALLOWED",
+ message_parameters={
+ "arg_name": "type",
+ "allowed_values": str(["perf", "memory"]),
+ },
+ )
+
class SparkExtensionsTest(unittest.TestCase):
# These tests are separate because it uses 'spark.sql.extensions' which is
diff --git a/python/pyspark/sql/tests/test_udf_profiler.py
b/python/pyspark/sql/tests/test_udf_profiler.py
index 557b4daa8550..a66503bc0213 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -521,6 +521,32 @@ class UDFProfiler2TestsMixin:
io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
)
+ def test_perf_profiler_clear(self):
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ _do_computation(self.spark)
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+
+ for id in self.profile_results:
+ self.spark.profile.clear(id)
+ self.assertNotIn(id, self.profile_results)
+ self.assertEqual(0, len(self.profile_results),
str(list(self.profile_results)))
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ _do_computation(self.spark)
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+
+ self.spark.profile.clear(type="memory")
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+ self.spark.profile.clear(type="perf")
+ self.assertEqual(0, len(self.profile_results),
str(list(self.profile_results)))
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ _do_computation(self.spark)
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+
+ self.spark.profile.clear()
+ self.assertEqual(0, len(self.profile_results),
str(list(self.profile_results)))
+
class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
diff --git a/python/pyspark/tests/test_memory_profiler.py
b/python/pyspark/tests/test_memory_profiler.py
index f0abdd03e243..046dd3621c42 100644
--- a/python/pyspark/tests/test_memory_profiler.py
+++ b/python/pyspark/tests/test_memory_profiler.py
@@ -221,6 +221,10 @@ class MemoryProfiler2TestsMixin:
def profile_results(self):
return self.spark._profiler_collector._memory_profile_results
+ @property
+ def perf_profile_results(self):
+ return self.spark._profiler_collector._perf_profile_results
+
def test_memory_profiler_udf(self):
_do_computation(self.spark)
@@ -571,6 +575,61 @@ class MemoryProfiler2TestsMixin:
io.getvalue(),
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
)
+ def test_memory_profiler_clear(self):
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ _do_computation(self.spark)
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+
+ for id in list(self.profile_results.keys()):
+ self.spark.profile.clear(id)
+ self.assertNotIn(id, self.profile_results)
+ self.assertEqual(0, len(self.profile_results),
str(list(self.profile_results)))
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ _do_computation(self.spark)
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+
+ self.spark.profile.clear(type="perf")
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+ self.spark.profile.clear(type="memory")
+ self.assertEqual(0, len(self.profile_results),
str(list(self.profile_results)))
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ _do_computation(self.spark)
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+
+ self.spark.profile.clear()
+ self.assertEqual(0, len(self.profile_results),
str(list(self.profile_results)))
+
+ def test_profilers_clear(self):
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+ _do_computation(self.spark)
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ _do_computation(self.spark)
+
+ self.assertEqual(3, len(self.profile_results),
str(list(self.profile_results)))
+
+ # clear a specific memory profile
+ some_id = next(iter(self.profile_results))
+ self.spark.profile.clear(some_id, type="memory")
+ self.assertEqual(2, len(self.profile_results),
str(list(self.profile_results)))
+ self.assertEqual(3, len(self.perf_profile_results),
str(list(self.perf_profile_results)))
+
+ # clear a specific perf profile
+ some_id = next(iter(self.perf_profile_results))
+ self.spark.profile.clear(some_id, type="perf")
+ self.assertEqual(2, len(self.perf_profile_results),
str(list(self.perf_profile_results)))
+ self.assertEqual(2, len(self.profile_results),
str(list(self.profile_results)))
+
+ # clear all memory profiles
+ self.spark.profile.clear(type="memory")
+ self.assertEqual(0, len(self.profile_results),
str(list(self.profile_results)))
+ self.assertEqual(2, len(self.perf_profile_results),
str(list(self.perf_profile_results)))
+
+ # clear all perf profiles
+ self.spark.profile.clear(type="perf")
+ self.assertEqual(0, len(self.perf_profile_results),
str(list(self.perf_profile_results)))
+
class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]