This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3894d061c82b [SPARK-54740][PYTHON] Start faulthandler early in daemon
mode
3894d061c82b is described below
commit 3894d061c82beb0cff9181a90d1df80aaafc7c40
Author: Tian Gao <[email protected]>
AuthorDate: Fri Feb 13 06:39:52 2026 +0900
[SPARK-54740][PYTHON] Start faulthandler early in daemon mode
### What changes were proposed in this pull request?
1. A new context is introduced to enable faulthandler
2. faulthandler helper is made reentry friendly
3. Enable faulthandler immediately after fork to cover more space
4. A minor cosmetic change on the condition of `authenticated`
### Why are the changes needed?
We should have as much faulthandler coverage as possible. If the worker
gets stuck before we start faulthandler (after `fork` for example), we don't
have much insight into the worker.
Notice that there used to be some discussion about when to enable the
periodic dump. This used to be an issue when the feature is enable-by-default,
but now it's more like a debug feature and users should only turn this on when
they need to know what really happened in their worker process. Waiting for
communication is also a state that requires attention, so I think we should not
rule that out.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53513 from gaogaotiantian/faulthandler-extra.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/daemon.py | 90 +++++++++++++++++++++++++-----------------------
python/pyspark/util.py | 19 +++++++++-
2 files changed, 64 insertions(+), 45 deletions(-)
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 9b0702bce74e..f92bb55476d1 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -30,6 +30,7 @@ from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
from pyspark.serializers import read_int, write_int, write_with_length,
UTF8Deserializer
+from pyspark.util import enable_faulthandler
from pyspark.errors import PySparkRuntimeError
@@ -226,53 +227,54 @@ def manager():
if pid == 0:
# in child process
- if poller is not None:
- poller.unregister(0)
- poller.unregister(listen_sock)
- listen_sock.close()
+ with enable_faulthandler():
+ if poller is not None:
+ poller.unregister(0)
+ poller.unregister(listen_sock)
+ listen_sock.close()
- # It should close the standard input in the child process
so that
- # Python native function executions stay intact.
- #
- # Note that if we just close the standard input (file
descriptor 0),
- # the lowest file descriptor (file descriptor 0) will be
allocated,
- # later when other file descriptors should happen to open.
- #
- # Therefore, here we redirects it to '/dev/null' by
duplicating
- # another file descriptor for '/dev/null' to the standard
input (0).
- # See SPARK-26175.
- devnull = open(os.devnull, "r")
- os.dup2(devnull.fileno(), 0)
- devnull.close()
+ # It should close the standard input in the child
process so that
+ # Python native function executions stay intact.
+ #
+ # Note that if we just close the standard input (file
descriptor 0),
+ # the lowest file descriptor (file descriptor 0) will
be allocated,
+ # later when other file descriptors should happen to
open.
+ #
+ # Therefore, here we redirects it to '/dev/null' by
duplicating
+ # another file descriptor for '/dev/null' to the
standard input (0).
+ # See SPARK-26175.
+ devnull = open(os.devnull, "r")
+ os.dup2(devnull.fileno(), 0)
+ devnull.close()
- try:
- # Acknowledge that the fork was successful
- outfile = sock.makefile(mode="wb")
- write_int(os.getpid(), outfile)
- outfile.flush()
- outfile.close()
- authenticated = (
- os.environ.get("PYTHON_UNIX_DOMAIN_ENABLED",
"false").lower() == "true"
- or False
- )
- while True:
- code = worker(sock, authenticated)
- if code == 0:
- authenticated = True
- if not reuse or code:
- # wait for closing
- try:
- while sock.recv(1024):
+ try:
+ # Acknowledge that the fork was successful
+ outfile = sock.makefile(mode="wb")
+ write_int(os.getpid(), outfile)
+ outfile.flush()
+ outfile.close()
+ authenticated = (
+ os.environ.get("PYTHON_UNIX_DOMAIN_ENABLED",
"false").lower()
+ == "true"
+ )
+ while True:
+ code = worker(sock, authenticated)
+ if code == 0:
+ authenticated = True
+ if not reuse or code:
+ # wait for closing
+ try:
+ while sock.recv(1024):
+ pass
+ except Exception:
pass
- except Exception:
- pass
- break
- gc.collect()
- except BaseException:
- traceback.print_exc()
- os._exit(1)
- else:
- os._exit(0)
+ break
+ gc.collect()
+ except BaseException:
+ traceback.print_exc()
+ os._exit(1)
+ else:
+ os._exit(0)
else:
sock.close()
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index e560d7927b9f..d3d6e956cb51 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -962,10 +962,13 @@ class _FaulthandlerHelper:
self._log_path: Optional[str] = None
self._log_file: Optional[TextIO] = None
self._periodic_traceback = False
+ self._reentry_depth = 0
def start(self) -> None:
+ self._reentry_depth += 1
if self._log_path:
- raise Exception("Fault handler is already registered. No second
registration allowed")
+ # faulthandler is already enabled
+ return
self._log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
if self._log_path:
self._log_path = os.path.join(self._log_path, str(os.getpid()))
@@ -974,6 +977,9 @@ class _FaulthandlerHelper:
faulthandler.enable(file=self._log_file)
def stop(self) -> None:
+ self._reentry_depth -= 1
+ if self._reentry_depth > 0:
+ return
if self._log_path:
faulthandler.disable()
if self._log_file:
@@ -1016,10 +1022,21 @@ class _FaulthandlerHelper:
return wrapper
+ @contextmanager
+ def enable_faulthandler(self, start_periodic_traceback: bool = True) ->
Iterator[None]:
+ try:
+ self.start()
+ if start_periodic_traceback:
+ self.start_periodic_traceback()
+ yield
+ finally:
+ self.stop()
+
_faulthandler_helper = _FaulthandlerHelper()
with_faulthandler = _faulthandler_helper.with_faulthandler
start_faulthandler_periodic_traceback =
_faulthandler_helper.start_periodic_traceback
+enable_faulthandler = _faulthandler_helper.enable_faulthandler
if __name__ == "__main__":
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]