This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0c8d18d41ed2 [SPARK-52599][PYTHON] Support periodical traceback dump
in Driver side workers
0c8d18d41ed2 is described below
commit 0c8d18d41ed24db814055c95774b67cd4f6fd5d5
Author: Takuya Ueshin <[email protected]>
AuthorDate: Mon Jun 30 16:44:15 2025 +0900
[SPARK-52599][PYTHON] Support periodical traceback dump in Driver side
workers
### What changes were proposed in this pull request?
Support periodical traceback dump in Driver side workers.
### Why are the changes needed?
The periodical traceback dump introduced by
[SPARK-52579](https://issues.apache.org/jira/browse/SPARK-52579) should be
available in Driver side Python workers, too.
### Does this PR introduce _any_ user-facing change?
Yes, the traceback will be dumped periodically from Driver side workers,
too.
```py
>>> from pyspark.sql import functions as sf
>>> from pyspark.sql.types import *
>>> from pyspark.sql.udtf import AnalyzeResult
>>> import time
>>>
>>> sf.udtf
... class TestUDTF:
... staticmethod
... def analyze(x):
... time.sleep(12)
... return AnalyzeResult(StructType().add("x_out", x.dataType))
... def eval(self, x):
... yield x,
...
>>>
spark.conf.set('spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds',
10)
>>> TestUDTF(sf.lit(1)).show()
Timeout (0:00:10)!
Thread 0x00000001ff418f80 (most recent call first):
File "<stdin>", line 5 in analyze
File "/.../pyspark/sql/worker/analyze_udtf.py", line 160 in main
...
+-----+
|x_out|
+-----+
| 1|
+-----+
```
### How was this patch tested?
Manually, and existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51309 from ueshin/issues/SPARK-52599/planner_workers.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
core/src/main/scala/org/apache/spark/internal/config/Python.scala | 2 +-
python/pyspark/sql/worker/analyze_udtf.py | 7 +++++++
python/pyspark/sql/worker/commit_data_source_write.py | 7 +++++++
python/pyspark/sql/worker/create_data_source.py | 7 +++++++
python/pyspark/sql/worker/data_source_pushdown_filters.py | 7 +++++++
python/pyspark/sql/worker/lookup_data_sources.py | 7 +++++++
python/pyspark/sql/worker/plan_data_source_read.py | 7 +++++++
python/pyspark/sql/worker/python_streaming_sink_runner.py | 8 ++++++++
python/pyspark/sql/worker/write_into_data_source.py | 7 +++++++
python/pyspark/worker.py | 6 ++----
.../apache/spark/sql/execution/python/PythonPlannerRunner.scala | 4 ++++
11 files changed, 64 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
index 8c3adedb372a..79f6d5bcb854 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -122,7 +122,7 @@ private[spark] object Python {
ConfigBuilder("spark.python.worker.tracebackDumpIntervalSeconds")
.doc("The interval (in seconds) for Python workers to dump their
tracebacks. " +
"If it's positive, the Python worker will periodically dump the
traceback into " +
- "its executor's `stderr`. The default is `0` that means it is
disabled.")
+ "its `stderr`. The default is `0` that means it is disabled.")
.version("4.1.0")
.timeConf(TimeUnit.SECONDS)
.checkValue(_ >= 0, "The interval should be 0 or positive.")
diff --git a/python/pyspark/sql/worker/analyze_udtf.py
b/python/pyspark/sql/worker/analyze_udtf.py
index 1c926f4980a5..892130bbae16 100644
--- a/python/pyspark/sql/worker/analyze_udtf.py
+++ b/python/pyspark/sql/worker/analyze_udtf.py
@@ -108,6 +108,7 @@ def main(infile: IO, outfile: IO) -> None:
and call the `analyze` static method, and send back a AnalyzeResult as a
result of the method.
"""
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -116,6 +117,9 @@ def main(infile: IO, outfile: IO) -> None:
check_python_version(infile)
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
setup_memory_limits(memory_limit_mb)
@@ -270,6 +274,9 @@ def main(infile: IO, outfile: IO) -> None:
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git a/python/pyspark/sql/worker/commit_data_source_write.py
b/python/pyspark/sql/worker/commit_data_source_write.py
index d08d65974dfb..dd080f1feb6c 100644
--- a/python/pyspark/sql/worker/commit_data_source_write.py
+++ b/python/pyspark/sql/worker/commit_data_source_write.py
@@ -49,6 +49,7 @@ def main(infile: IO, outfile: IO) -> None:
writer instance, given a list of commit messages.
"""
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -57,6 +58,9 @@ def main(infile: IO, outfile: IO) -> None:
check_python_version(infile)
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
setup_memory_limits(memory_limit_mb)
@@ -116,6 +120,9 @@ def main(infile: IO, outfile: IO) -> None:
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git a/python/pyspark/sql/worker/create_data_source.py
b/python/pyspark/sql/worker/create_data_source.py
index 424f07012723..fc1b8eaffdaa 100644
--- a/python/pyspark/sql/worker/create_data_source.py
+++ b/python/pyspark/sql/worker/create_data_source.py
@@ -62,6 +62,7 @@ def main(infile: IO, outfile: IO) -> None:
sends the pickled instance as well as the schema back to the JVM.
"""
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -70,6 +71,9 @@ def main(infile: IO, outfile: IO) -> None:
check_python_version(infile)
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
setup_memory_limits(memory_limit_mb)
@@ -181,6 +185,9 @@ def main(infile: IO, outfile: IO) -> None:
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git a/python/pyspark/sql/worker/data_source_pushdown_filters.py
b/python/pyspark/sql/worker/data_source_pushdown_filters.py
index 0415f450fe0f..ac6f84e61715 100644
--- a/python/pyspark/sql/worker/data_source_pushdown_filters.py
+++ b/python/pyspark/sql/worker/data_source_pushdown_filters.py
@@ -140,6 +140,7 @@ def main(infile: IO, outfile: IO) -> None:
filters are sent back to the JVM, along with the list of partitions and
the read function.
"""
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -148,6 +149,9 @@ def main(infile: IO, outfile: IO) -> None:
check_python_version(infile)
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
setup_memory_limits(memory_limit_mb)
@@ -266,6 +270,9 @@ def main(infile: IO, outfile: IO) -> None:
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git a/python/pyspark/sql/worker/lookup_data_sources.py
b/python/pyspark/sql/worker/lookup_data_sources.py
index af138ab68965..eeb84263d445 100644
--- a/python/pyspark/sql/worker/lookup_data_sources.py
+++ b/python/pyspark/sql/worker/lookup_data_sources.py
@@ -52,6 +52,7 @@ def main(infile: IO, outfile: IO) -> None:
statically registered automatically.
"""
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -60,6 +61,9 @@ def main(infile: IO, outfile: IO) -> None:
check_python_version(infile)
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
setup_memory_limits(memory_limit_mb)
@@ -101,6 +105,9 @@ def main(infile: IO, outfile: IO) -> None:
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git a/python/pyspark/sql/worker/plan_data_source_read.py
b/python/pyspark/sql/worker/plan_data_source_read.py
index 5edc8185adcf..7c14ebfc53e4 100644
--- a/python/pyspark/sql/worker/plan_data_source_read.py
+++ b/python/pyspark/sql/worker/plan_data_source_read.py
@@ -284,6 +284,7 @@ def main(infile: IO, outfile: IO) -> None:
via the socket.
"""
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -292,6 +293,9 @@ def main(infile: IO, outfile: IO) -> None:
check_python_version(infile)
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
setup_memory_limits(memory_limit_mb)
@@ -406,6 +410,9 @@ def main(infile: IO, outfile: IO) -> None:
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py
b/python/pyspark/sql/worker/python_streaming_sink_runner.py
index cf6246b54490..83ba027a0601 100644
--- a/python/pyspark/sql/worker/python_streaming_sink_runner.py
+++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py
@@ -57,6 +57,7 @@ def main(infile: IO, outfile: IO) -> None:
writer instance, given a list of commit messages.
"""
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -64,6 +65,10 @@ def main(infile: IO, outfile: IO) -> None:
faulthandler.enable(file=faulthandler_log_file)
check_python_version(infile)
+
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
setup_spark_files(infile)
setup_broadcasts(infile)
@@ -145,6 +150,9 @@ def main(infile: IO, outfile: IO) -> None:
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git a/python/pyspark/sql/worker/write_into_data_source.py
b/python/pyspark/sql/worker/write_into_data_source.py
index d6d055f01e54..e2fa147becce 100644
--- a/python/pyspark/sql/worker/write_into_data_source.py
+++ b/python/pyspark/sql/worker/write_into_data_source.py
@@ -76,6 +76,7 @@ def main(infile: IO, outfile: IO) -> None:
in mapInPandas/mapInArrow back to the JVM.
"""
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -84,6 +85,9 @@ def main(infile: IO, outfile: IO) -> None:
check_python_version(infile)
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
setup_memory_limits(memory_limit_mb)
@@ -252,6 +256,9 @@ def main(infile: IO, outfile: IO) -> None:
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 3454c9855a58..21735d7624bb 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2348,8 +2348,6 @@ def read_udfs(pickleSer, infile, eval_type):
def main(infile, outfile):
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
- if tracebackDumpIntervalSeconds is not None:
- tracebackDumpIntervalSeconds = int(tracebackDumpIntervalSeconds)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -2361,8 +2359,8 @@ def main(infile, outfile):
if split_index == -1: # for unit tests
sys.exit(-1)
- if tracebackDumpIntervalSeconds is not None and
tracebackDumpIntervalSeconds > 0:
- faulthandler.dump_traceback_later(tracebackDumpIntervalSeconds,
repeat=True)
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
check_python_version(infile)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
index 897b520f0b1a..7834b591e2e8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
@@ -57,6 +57,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction)
extends Logging {
val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
val idleTimeoutSeconds: Long =
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+ val tracebackDumpIntervalSeconds: Long =
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
val workerMemoryMb = SQLConf.get.pythonPlannerExecMemory
@@ -88,6 +89,9 @@ abstract class PythonPlannerRunner[T](func: PythonFunction)
extends Logging {
if (faultHandlerEnabled) {
envVars.put("PYTHON_FAULTHANDLER_DIR", faultHandlerLogDir.toString)
}
+ if (tracebackDumpIntervalSeconds > 0L) {
+ envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS",
tracebackDumpIntervalSeconds.toString)
+ }
envVars.put("SPARK_JOB_ARTIFACT_UUID",
jobArtifactUUID.getOrElse("default"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]