This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3894d061c82b [SPARK-54740][PYTHON] Start faulthandler early in daemon 
mode
3894d061c82b is described below

commit 3894d061c82beb0cff9181a90d1df80aaafc7c40
Author: Tian Gao <[email protected]>
AuthorDate: Fri Feb 13 06:39:52 2026 +0900

    [SPARK-54740][PYTHON] Start faulthandler early in daemon mode
    
    ### What changes were proposed in this pull request?
    
    1. A new context is introduced to enable faulthandler
    2. faulthandler helper is made reentry friendly
    3. Enable faulthandler immediately after fork to cover more space
    4. A minor cosmetic change on the condition of `authenticated`
    
    ### Why are the changes needed?
    
    We should have as much faulthandler coverage as possible. If the worker 
gets stuck before we start faulthandler (after `fork` for example), we don't 
have much insight into the worker.
    
    Notice that there used to be some discussion about when to enable the 
periodic dump. This used to be an issue when the feature is enable-by-default, 
but now it's more like a debug feature and users should only turn this on when 
they need to know what really happened in their worker process. Waiting for 
communication is also a state that requires attention, so I think we should not 
rule that out.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53513 from gaogaotiantian/faulthandler-extra.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/daemon.py | 90 +++++++++++++++++++++++++-----------------------
 python/pyspark/util.py   | 19 +++++++++-
 2 files changed, 64 insertions(+), 45 deletions(-)

diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 9b0702bce74e..f92bb55476d1 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -30,6 +30,7 @@ from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
 
 from pyspark.serializers import read_int, write_int, write_with_length, 
UTF8Deserializer
+from pyspark.util import enable_faulthandler
 from pyspark.errors import PySparkRuntimeError
 
 
@@ -226,53 +227,54 @@ def manager():
 
                 if pid == 0:
                     # in child process
-                    if poller is not None:
-                        poller.unregister(0)
-                        poller.unregister(listen_sock)
-                    listen_sock.close()
+                    with enable_faulthandler():
+                        if poller is not None:
+                            poller.unregister(0)
+                            poller.unregister(listen_sock)
+                        listen_sock.close()
 
-                    # It should close the standard input in the child process 
so that
-                    # Python native function executions stay intact.
-                    #
-                    # Note that if we just close the standard input (file 
descriptor 0),
-                    # the lowest file descriptor (file descriptor 0) will be 
allocated,
-                    # later when other file descriptors should happen to open.
-                    #
-                    # Therefore, here we redirects it to '/dev/null' by 
duplicating
-                    # another file descriptor for '/dev/null' to the standard 
input (0).
-                    # See SPARK-26175.
-                    devnull = open(os.devnull, "r")
-                    os.dup2(devnull.fileno(), 0)
-                    devnull.close()
+                        # It should close the standard input in the child 
process so that
+                        # Python native function executions stay intact.
+                        #
+                        # Note that if we just close the standard input (file 
descriptor 0),
+                        # the lowest file descriptor (file descriptor 0) will 
be allocated,
+                        # later when other file descriptors should happen to 
open.
+                        #
+                        # Therefore, here we redirects it to '/dev/null' by 
duplicating
+                        # another file descriptor for '/dev/null' to the 
standard input (0).
+                        # See SPARK-26175.
+                        devnull = open(os.devnull, "r")
+                        os.dup2(devnull.fileno(), 0)
+                        devnull.close()
 
-                    try:
-                        # Acknowledge that the fork was successful
-                        outfile = sock.makefile(mode="wb")
-                        write_int(os.getpid(), outfile)
-                        outfile.flush()
-                        outfile.close()
-                        authenticated = (
-                            os.environ.get("PYTHON_UNIX_DOMAIN_ENABLED", 
"false").lower() == "true"
-                            or False
-                        )
-                        while True:
-                            code = worker(sock, authenticated)
-                            if code == 0:
-                                authenticated = True
-                            if not reuse or code:
-                                # wait for closing
-                                try:
-                                    while sock.recv(1024):
+                        try:
+                            # Acknowledge that the fork was successful
+                            outfile = sock.makefile(mode="wb")
+                            write_int(os.getpid(), outfile)
+                            outfile.flush()
+                            outfile.close()
+                            authenticated = (
+                                os.environ.get("PYTHON_UNIX_DOMAIN_ENABLED", 
"false").lower()
+                                == "true"
+                            )
+                            while True:
+                                code = worker(sock, authenticated)
+                                if code == 0:
+                                    authenticated = True
+                                if not reuse or code:
+                                    # wait for closing
+                                    try:
+                                        while sock.recv(1024):
+                                            pass
+                                    except Exception:
                                         pass
-                                except Exception:
-                                    pass
-                                break
-                            gc.collect()
-                    except BaseException:
-                        traceback.print_exc()
-                        os._exit(1)
-                    else:
-                        os._exit(0)
+                                    break
+                                gc.collect()
+                        except BaseException:
+                            traceback.print_exc()
+                            os._exit(1)
+                        else:
+                            os._exit(0)
                 else:
                     sock.close()
 
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index e560d7927b9f..d3d6e956cb51 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -962,10 +962,13 @@ class _FaulthandlerHelper:
         self._log_path: Optional[str] = None
         self._log_file: Optional[TextIO] = None
         self._periodic_traceback = False
+        self._reentry_depth = 0
 
     def start(self) -> None:
+        self._reentry_depth += 1
         if self._log_path:
-            raise Exception("Fault handler is already registered. No second 
registration allowed")
+            # faulthandler is already enabled
+            return
         self._log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
         if self._log_path:
             self._log_path = os.path.join(self._log_path, str(os.getpid()))
@@ -974,6 +977,9 @@ class _FaulthandlerHelper:
             faulthandler.enable(file=self._log_file)
 
     def stop(self) -> None:
+        self._reentry_depth -= 1
+        if self._reentry_depth > 0:
+            return
         if self._log_path:
             faulthandler.disable()
             if self._log_file:
@@ -1016,10 +1022,21 @@ class _FaulthandlerHelper:
 
         return wrapper
 
+    @contextmanager
+    def enable_faulthandler(self, start_periodic_traceback: bool = True) -> 
Iterator[None]:
+        try:
+            self.start()
+            if start_periodic_traceback:
+                self.start_periodic_traceback()
+            yield
+        finally:
+            self.stop()
+
 
 _faulthandler_helper = _FaulthandlerHelper()
 with_faulthandler = _faulthandler_helper.with_faulthandler
 start_faulthandler_periodic_traceback = 
_faulthandler_helper.start_periodic_traceback
+enable_faulthandler = _faulthandler_helper.enable_faulthandler
 
 
 if __name__ == "__main__":


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to