This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b67ef6c29da3 [SPARK-55032][PYTHON] Refactor profilers in workers.py
b67ef6c29da3 is described below
commit b67ef6c29da305c62a1d990d5e9648e790a35535
Author: Tian Gao <[email protected]>
AuthorDate: Mon Jan 19 08:57:40 2026 +0800
[SPARK-55032][PYTHON] Refactor profilers in workers.py
### What changes were proposed in this pull request?
Refactored the perf and memory profiler in workers.py so adding profiler
only requires a single line of code.
### Why are the changes needed?
This makes it easier for the code to be reused in other workers like data
source.
In the future, if we need to replace the backend of perf/memory profiler,
it would also be easier.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`test_udf_profilers` passed locally.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53793 from gaogaotiantian/refactor-profiler.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/profiler.py | 81 ++++++++++++++++++++++++++++++++++++++++++
python/pyspark/worker.py | 47 ++++--------------------
2 files changed, 87 insertions(+), 41 deletions(-)
diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py
index f7b391ec8000..8455aacafc45 100644
--- a/python/pyspark/sql/profiler.py
+++ b/python/pyspark/sql/profiler.py
@@ -16,9 +16,11 @@
#
from abc import ABC, abstractmethod
from io import StringIO
+import cProfile
import os
import pstats
from threading import RLock
+from types import TracebackType
from typing import Any, Callable, Dict, Literal, Optional, Tuple, Union,
TYPE_CHECKING, overload
import warnings
@@ -75,6 +77,85 @@ class
_ProfileResultsParam(AccumulatorParam[Optional["ProfileResults"]]):
ProfileResultsParam = _ProfileResultsParam()
+class WorkerPerfProfiler:
+ """
+ PerfProfiler is a profiler for performance profiling.
+ """
+
+ def __init__(self, accumulator: Accumulator["ProfileResults"], result_id:
int) -> None:
+ self._accumulator = accumulator
+ self._profiler = cProfile.Profile()
+ self._result_id = result_id
+
+ def start(self) -> None:
+ self._profiler.enable()
+
+ def stop(self) -> None:
+ self._profiler.disable()
+
+ def save(self) -> None:
+ st = pstats.Stats(self._profiler, stream=None)
+ # make it picklable
+ st.stream = None # type: ignore[attr-defined]
+ st.strip_dirs()
+ self._accumulator.add({self._result_id: (st, None)})
+
+ def __enter__(self) -> "WorkerPerfProfiler":
+ self.start()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.stop()
+ self.save()
+
+
+class WorkerMemoryProfiler:
+ """
+ MemoryProfiler is a profiler for memory profiling.
+ """
+
+ def __init__(
+ self, accumulator: Accumulator["ProfileResults"], result_id: int,
func: Callable
+ ) -> None:
+ from pyspark.profiler import UDFLineProfilerV2
+
+ self._accumulator = accumulator
+ self._profiler = UDFLineProfilerV2()
+ self._profiler.add_function(func)
+ self._result_id = result_id
+
+ def start(self) -> None:
+ self._profiler.enable_by_count()
+
+ def stop(self) -> None:
+ self._profiler.disable_by_count()
+
+ def save(self) -> None:
+ codemap_dict = {
+ filename: list(line_iterator)
+ for filename, line_iterator in self._profiler.code_map.items()
+ }
+ self._accumulator.add({self._result_id: (None, codemap_dict)})
+
+ def __enter__(self) -> "WorkerMemoryProfiler":
+ self.start()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.stop()
+ self.save()
+
+
class ProfilerCollector(ABC):
"""
A base class of profiler collectors for session based profilers.
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index b65636857cfe..96a1cfb37c3b 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1335,10 +1335,7 @@ def _is_iter_based(eval_type: int) -> bool:
def wrap_perf_profiler(f, eval_type, result_id):
- import cProfile
- import pstats
-
- from pyspark.sql.profiler import ProfileResultsParam
+ from pyspark.sql.profiler import ProfileResultsParam, WorkerPerfProfiler
accumulator = _deserialize_accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
@@ -1348,40 +1345,26 @@ def wrap_perf_profiler(f, eval_type, result_id):
def profiling_func(*args, **kwargs):
iterator = iter(f(*args, **kwargs))
- pr = cProfile.Profile()
while True:
try:
- with pr:
+ with WorkerPerfProfiler(accumulator, result_id):
item = next(iterator)
yield item
except StopIteration:
break
- st = pstats.Stats(pr)
- st.stream = None # make it picklable
- st.strip_dirs()
-
- accumulator.add({result_id: (st, None)})
-
else:
def profiling_func(*args, **kwargs):
- with cProfile.Profile() as pr:
+ with WorkerPerfProfiler(accumulator, result_id):
ret = f(*args, **kwargs)
- st = pstats.Stats(pr)
- st.stream = None # make it picklable
- st.strip_dirs()
-
- accumulator.add({result_id: (st, None)})
-
return ret
return profiling_func
def wrap_memory_profiler(f, eval_type, result_id):
- from pyspark.sql.profiler import ProfileResultsParam
- from pyspark.profiler import UDFLineProfilerV2
+ from pyspark.sql.profiler import ProfileResultsParam, WorkerMemoryProfiler
if not has_memory_profiler:
return f
@@ -1393,39 +1376,21 @@ def wrap_memory_profiler(f, eval_type, result_id):
if _is_iter_based(eval_type):
def profiling_func(*args, **kwargs):
- profiler = UDFLineProfilerV2()
- profiler.add_function(f)
-
iterator = iter(f(*args, **kwargs))
while True:
try:
- with profiler:
+ with WorkerMemoryProfiler(accumulator, result_id, f):
item = next(iterator)
yield item
except StopIteration:
break
- codemap_dict = {
- filename: list(line_iterator)
- for filename, line_iterator in profiler.code_map.items()
- }
- accumulator.add({result_id: (None, codemap_dict)})
-
else:
def profiling_func(*args, **kwargs):
- profiler = UDFLineProfilerV2()
- profiler.add_function(f)
-
- with profiler:
+ with WorkerMemoryProfiler(accumulator, result_id, f):
ret = f(*args, **kwargs)
-
- codemap_dict = {
- filename: list(line_iterator)
- for filename, line_iterator in profiler.code_map.items()
- }
- accumulator.add({result_id: (None, codemap_dict)})
return ret
return profiling_func
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]