This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c8d18d41ed2 [SPARK-52599][PYTHON] Support periodical traceback dump 
in Driver side workers
0c8d18d41ed2 is described below

commit 0c8d18d41ed24db814055c95774b67cd4f6fd5d5
Author: Takuya Ueshin <[email protected]>
AuthorDate: Mon Jun 30 16:44:15 2025 +0900

    [SPARK-52599][PYTHON] Support periodical traceback dump in Driver side 
workers
    
    ### What changes were proposed in this pull request?
    
    Support periodical traceback dump in Driver side workers.
    
    ### Why are the changes needed?
    
    The periodical traceback dump introduced by 
[SPARK-52579](https://issues.apache.org/jira/browse/SPARK-52579) should be 
available in Driver side Python workers, too.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the traceback will be dumped periodically from Driver side workers, 
too.
    
    ```py
    >>> from pyspark.sql import functions as sf
    >>> from pyspark.sql.types import *
    >>> from pyspark.sql.udtf import AnalyzeResult
    >>> import time
    >>>
    >>> sf.udtf
    ... class TestUDTF:
    ...     staticmethod
    ...     def analyze(x):
    ...         time.sleep(12)
    ...         return AnalyzeResult(StructType().add("x_out", x.dataType))
    ...     def eval(self, x):
    ...         yield x,
    ...
    >>> 
spark.conf.set('spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds', 
10)
    >>> TestUDTF(sf.lit(1)).show()
    Timeout (0:00:10)!
    Thread 0x00000001ff418f80 (most recent call first):
      File "<stdin>", line 5 in analyze
      File "/.../pyspark/sql/worker/analyze_udtf.py", line 160 in main
    ...
    
    +-----+
    |x_out|
    +-----+
    |    1|
    +-----+
    ```
    
    ### How was this patch tested?
    
    Manually, and existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51309 from ueshin/issues/SPARK-52599/planner_workers.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 core/src/main/scala/org/apache/spark/internal/config/Python.scala | 2 +-
 python/pyspark/sql/worker/analyze_udtf.py                         | 7 +++++++
 python/pyspark/sql/worker/commit_data_source_write.py             | 7 +++++++
 python/pyspark/sql/worker/create_data_source.py                   | 7 +++++++
 python/pyspark/sql/worker/data_source_pushdown_filters.py         | 7 +++++++
 python/pyspark/sql/worker/lookup_data_sources.py                  | 7 +++++++
 python/pyspark/sql/worker/plan_data_source_read.py                | 7 +++++++
 python/pyspark/sql/worker/python_streaming_sink_runner.py         | 8 ++++++++
 python/pyspark/sql/worker/write_into_data_source.py               | 7 +++++++
 python/pyspark/worker.py                                          | 6 ++----
 .../apache/spark/sql/execution/python/PythonPlannerRunner.scala   | 4 ++++
 11 files changed, 64 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
index 8c3adedb372a..79f6d5bcb854 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -122,7 +122,7 @@ private[spark] object Python {
     ConfigBuilder("spark.python.worker.tracebackDumpIntervalSeconds")
       .doc("The interval (in seconds) for Python workers to dump their 
tracebacks. " +
         "If it's positive, the Python worker will periodically dump the 
traceback into " +
-        "its executor's `stderr`. The default is `0` that means it is 
disabled.")
+        "its `stderr`. The default is `0` that means it is disabled.")
       .version("4.1.0")
       .timeConf(TimeUnit.SECONDS)
       .checkValue(_ >= 0, "The interval should be 0 or positive.")
diff --git a/python/pyspark/sql/worker/analyze_udtf.py 
b/python/pyspark/sql/worker/analyze_udtf.py
index 1c926f4980a5..892130bbae16 100644
--- a/python/pyspark/sql/worker/analyze_udtf.py
+++ b/python/pyspark/sql/worker/analyze_udtf.py
@@ -108,6 +108,7 @@ def main(infile: IO, outfile: IO) -> None:
     and call the `analyze` static method, and send back a AnalyzeResult as a 
result of the method.
     """
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -116,6 +117,9 @@ def main(infile: IO, outfile: IO) -> None:
 
         check_python_version(infile)
 
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
+
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
         setup_memory_limits(memory_limit_mb)
 
@@ -270,6 +274,9 @@ def main(infile: IO, outfile: IO) -> None:
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git a/python/pyspark/sql/worker/commit_data_source_write.py 
b/python/pyspark/sql/worker/commit_data_source_write.py
index d08d65974dfb..dd080f1feb6c 100644
--- a/python/pyspark/sql/worker/commit_data_source_write.py
+++ b/python/pyspark/sql/worker/commit_data_source_write.py
@@ -49,6 +49,7 @@ def main(infile: IO, outfile: IO) -> None:
     writer instance, given a list of commit messages.
     """
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -57,6 +58,9 @@ def main(infile: IO, outfile: IO) -> None:
 
         check_python_version(infile)
 
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
+
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
         setup_memory_limits(memory_limit_mb)
 
@@ -116,6 +120,9 @@ def main(infile: IO, outfile: IO) -> None:
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git a/python/pyspark/sql/worker/create_data_source.py 
b/python/pyspark/sql/worker/create_data_source.py
index 424f07012723..fc1b8eaffdaa 100644
--- a/python/pyspark/sql/worker/create_data_source.py
+++ b/python/pyspark/sql/worker/create_data_source.py
@@ -62,6 +62,7 @@ def main(infile: IO, outfile: IO) -> None:
     sends the pickled instance as well as the schema back to the JVM.
     """
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -70,6 +71,9 @@ def main(infile: IO, outfile: IO) -> None:
 
         check_python_version(infile)
 
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
+
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
         setup_memory_limits(memory_limit_mb)
 
@@ -181,6 +185,9 @@ def main(infile: IO, outfile: IO) -> None:
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git a/python/pyspark/sql/worker/data_source_pushdown_filters.py 
b/python/pyspark/sql/worker/data_source_pushdown_filters.py
index 0415f450fe0f..ac6f84e61715 100644
--- a/python/pyspark/sql/worker/data_source_pushdown_filters.py
+++ b/python/pyspark/sql/worker/data_source_pushdown_filters.py
@@ -140,6 +140,7 @@ def main(infile: IO, outfile: IO) -> None:
     filters are sent back to the JVM, along with the list of partitions and 
the read function.
     """
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -148,6 +149,9 @@ def main(infile: IO, outfile: IO) -> None:
 
         check_python_version(infile)
 
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
+
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
         setup_memory_limits(memory_limit_mb)
 
@@ -266,6 +270,9 @@ def main(infile: IO, outfile: IO) -> None:
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git a/python/pyspark/sql/worker/lookup_data_sources.py 
b/python/pyspark/sql/worker/lookup_data_sources.py
index af138ab68965..eeb84263d445 100644
--- a/python/pyspark/sql/worker/lookup_data_sources.py
+++ b/python/pyspark/sql/worker/lookup_data_sources.py
@@ -52,6 +52,7 @@ def main(infile: IO, outfile: IO) -> None:
     statically registered automatically.
     """
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -60,6 +61,9 @@ def main(infile: IO, outfile: IO) -> None:
 
         check_python_version(infile)
 
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
+
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
         setup_memory_limits(memory_limit_mb)
 
@@ -101,6 +105,9 @@ def main(infile: IO, outfile: IO) -> None:
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git a/python/pyspark/sql/worker/plan_data_source_read.py 
b/python/pyspark/sql/worker/plan_data_source_read.py
index 5edc8185adcf..7c14ebfc53e4 100644
--- a/python/pyspark/sql/worker/plan_data_source_read.py
+++ b/python/pyspark/sql/worker/plan_data_source_read.py
@@ -284,6 +284,7 @@ def main(infile: IO, outfile: IO) -> None:
     via the socket.
     """
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -292,6 +293,9 @@ def main(infile: IO, outfile: IO) -> None:
 
         check_python_version(infile)
 
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
+
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
         setup_memory_limits(memory_limit_mb)
 
@@ -406,6 +410,9 @@ def main(infile: IO, outfile: IO) -> None:
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py 
b/python/pyspark/sql/worker/python_streaming_sink_runner.py
index cf6246b54490..83ba027a0601 100644
--- a/python/pyspark/sql/worker/python_streaming_sink_runner.py
+++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py
@@ -57,6 +57,7 @@ def main(infile: IO, outfile: IO) -> None:
     writer instance, given a list of commit messages.
     """
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -64,6 +65,10 @@ def main(infile: IO, outfile: IO) -> None:
             faulthandler.enable(file=faulthandler_log_file)
 
         check_python_version(infile)
+
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
+
         setup_spark_files(infile)
         setup_broadcasts(infile)
 
@@ -145,6 +150,9 @@ def main(infile: IO, outfile: IO) -> None:
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git a/python/pyspark/sql/worker/write_into_data_source.py 
b/python/pyspark/sql/worker/write_into_data_source.py
index d6d055f01e54..e2fa147becce 100644
--- a/python/pyspark/sql/worker/write_into_data_source.py
+++ b/python/pyspark/sql/worker/write_into_data_source.py
@@ -76,6 +76,7 @@ def main(infile: IO, outfile: IO) -> None:
     in mapInPandas/mapInArrow back to the JVM.
     """
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -84,6 +85,9 @@ def main(infile: IO, outfile: IO) -> None:
 
         check_python_version(infile)
 
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
+
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
         setup_memory_limits(memory_limit_mb)
 
@@ -252,6 +256,9 @@ def main(infile: IO, outfile: IO) -> None:
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 3454c9855a58..21735d7624bb 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2348,8 +2348,6 @@ def read_udfs(pickleSer, infile, eval_type):
 def main(infile, outfile):
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
     tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
-    if tracebackDumpIntervalSeconds is not None:
-        tracebackDumpIntervalSeconds = int(tracebackDumpIntervalSeconds)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -2361,8 +2359,8 @@ def main(infile, outfile):
         if split_index == -1:  # for unit tests
             sys.exit(-1)
 
-        if tracebackDumpIntervalSeconds is not None and 
tracebackDumpIntervalSeconds > 0:
-            faulthandler.dump_traceback_later(tracebackDumpIntervalSeconds, 
repeat=True)
+        if tracebackDumpIntervalSeconds is not None and 
int(tracebackDumpIntervalSeconds) > 0:
+            
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), 
repeat=True)
 
         check_python_version(infile)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
index 897b520f0b1a..7834b591e2e8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
@@ -57,6 +57,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) 
extends Logging {
     val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
     val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
     val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+    val tracebackDumpIntervalSeconds: Long = 
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
     val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
     val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
     val workerMemoryMb = SQLConf.get.pythonPlannerExecMemory
@@ -88,6 +89,9 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) 
extends Logging {
     if (faultHandlerEnabled) {
       envVars.put("PYTHON_FAULTHANDLER_DIR", faultHandlerLogDir.toString)
     }
+    if (tracebackDumpIntervalSeconds > 0L) {
+      envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", 
tracebackDumpIntervalSeconds.toString)
+    }
 
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to