This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 72fab5c3f5a Fix macOS `SIGSEGV` in task execution by using
`fork`+`exec` (#64874) (#66872)
72fab5c3f5a is described below
commit 72fab5c3f5ad0549d4cf31f9b135cb096ff423ff
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 17:01:01 2026 +0530
Fix macOS `SIGSEGV` in task execution by using `fork`+`exec` (#64874)
(#66872)
On macOS, the task supervisor's bare os.fork() copies the parent's
Objective-C runtime state into the child process. When the child
later triggers ObjC class initialization (e.g. socket.getaddrinfo ->
system DNS resolver -> Security.framework -> +[NSNumber initialize]),
the runtime detects the corrupted state and crashes with SIGABRT/SIGSEGV.
This is a well-documented macOS platform limitation -- Apple's ObjC
runtime, CoreFoundation, and libdispatch are not fork-safe. CPython
changed multiprocessing's default start method to "spawn" on macOS in
3.8 for this reason, but Airflow's TaskSDK supervisor uses os.fork()
directly.
The fix: on macOS, immediately call os.execv() after os.fork() for
task execution subprocesses. The exec replaces the child's address
space, giving it clean ObjC state. The socketpair FDs survive across
exec (marked inheritable) and the child reads their numbers from an
environment variable.
Only task execution (target=_subprocess_main) uses fork+exec. DAG
processor and triggerer pass different targets and keep bare fork --
they don't make network calls that trigger the macOS crash.
References:
- https://github.com/python/cpython/issues/105912
- https://github.com/python/cpython/issues/58037
- https://github.com/apache/airflow/discussions/24463
(cherry picked from commit a3383b709466595fe0f44264e241c1f33eb34230)
Co-authored-by: Kaxil Naik <[email protected]>
---
.../src/airflow/sdk/execution_time/supervisor.py | 118 +++++++++++++++++++--
.../src/airflow/sdk/execution_time/task_runner.py | 8 ++
.../task_sdk/execution_time/test_supervisor.py | 63 ++++++++++-
3 files changed, 178 insertions(+), 11 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index d939234a9a5..0c88bc9812f 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -195,10 +195,18 @@ Note: Only Linux-based distros supported as "Production"
execution environment f
macOS
-----
- 1. Due to limitations in Apple's libraries not every process might 'fork'
safe.
- One of the general error is unable to query the macOS system configuration
for network proxies.
- If your are not using a proxy you could disable it by set environment
variable 'no_proxy' to '*'.
- See: https://github.com/python/cpython/issues/58037 and
https://bugs.python.org/issue30385#msg293958
+ Apple's Objective-C runtime and system frameworks (Security.framework,
+ CoreFoundation, libdispatch) are not safe to use after fork() without exec().
+ Airflow's task supervisor uses fork+exec on macOS to avoid this, but other
+ code paths (DAG processor, triggerer) still use bare fork. If you see
+ crashes mentioning "+[NSNumber initialize]" or "_scproxy", the most likely
+ cause is a bare fork touching Apple frameworks.
+
+ Common triggers: socket.getaddrinfo (DNS resolver), urllib proxy lookup,
+ SSL context initialization via Security.framework.
+
+ See: https://github.com/python/cpython/issues/105912
+ https://github.com/python/cpython/issues/58037
********************************************************************************************************"""
@@ -441,6 +449,55 @@ def _fork_main(
exit(125)
+_FORK_EXEC_PLATFORMS = {"darwin"}
+"""Platforms where we fork+exec instead of bare ``os.fork`` for task execution.
+
+macOS system libraries (Security.framework, CoreFoundation, ``_scproxy``) use
+Objective-C, which is not fork-safe. A bare ``os.fork()`` copies the parent's
+ObjC runtime state; if the child then triggers ObjC class initialization
+(e.g. via ``socket.getaddrinfo`` -> system DNS resolver -> proxy lookup), the
+runtime detects the corrupted state and crashes with SIGABRT.
+
+Calling ``os.execv`` immediately after ``os.fork`` replaces the child's address
+space, giving it clean ObjC state. Before exec, the supervisor ``dup2``s the
+socketpairs onto FDs 0 (requests/stdin), 1 (stdout), 2 (stderr). The
duplicated
+FDs survive the upcoming exec because ``os.dup2(inheritable=True)`` (the
default)
+clears ``FD_CLOEXEC`` on the destination FDs. The log channel is obtained
after
+startup via the existing ``ResendLoggingFD`` mechanism.
+
+Currently only task execution opts in (via ``ActivitySubprocess.start``). DAG
+processor and triggerer can also hit this crash and will need the same
treatment
+as a follow-up (see https://github.com/apache/airflow/issues/65691).
+
+See: https://github.com/python/cpython/issues/105912
+ https://github.com/apache/airflow/discussions/24463
+"""
+
+
+def _child_exec_main():
+ """
+ Entry point for the child process when using fork+exec (macOS).
+
+ After exec, FDs 0/1/2 are already the requests/stdout/stderr sockets
+ (dup2'd by the parent before exec). The log channel is NOT inherited;
+ instead, the task runner requests it from the supervisor via the existing
+ ``ResendLoggingFD`` mechanism after startup.
+ """
+ # FDs 0, 1, 2 were dup2'd onto the socketpairs before exec.
+ child_requests = socket(fileno=0)
+ child_stdout = socket(fileno=1)
+ child_stderr = socket(fileno=2)
+
+ # _fork_main always exits via os._exit(), so the socket objects above are
+ # never GC'd (which would close their underlying FDs). This is safe but
+ # depends on that invariant -- do not refactor _fork_main to return.
+ #
+ # log_fd=0 tells _fork_main to skip structured log channel setup.
+ # Signal to the task runner to request it via ResendLoggingFD after
startup.
+ os.environ["_AIRFLOW_FORK_EXEC"] = "1"
+ _fork_main(child_requests, child_stdout, child_stderr, 0, _subprocess_main)
+
+
@attrs.define(kw_only=True)
class WatchedSubprocess:
"""
@@ -488,9 +545,24 @@ class WatchedSubprocess:
*,
target: Callable[[], None] = _subprocess_main,
logger: FilteringBoundLogger | None = None,
+ use_exec: bool = False,
**constructor_kwargs,
) -> Self:
- """Fork and start a new subprocess with the specified target
function."""
+ """
+ Fork and start a new subprocess with the specified target function.
+
+ :param use_exec: If True, on platforms that need it (currently macOS),
+ immediately ``os.execv`` a fresh Python interpreter after
``os.fork``.
+ This avoids macOS fork-safety issues with Objective-C frameworks.
+ Task execution opts in; DAG processor and triggerer do not.
+
+ The exec'd child always runs ``_subprocess_main``, so ``use_exec=True``
+ is only valid when ``target is _subprocess_main``.
+ """
+ if use_exec and target is not _subprocess_main:
+ raise ValueError(
+ f"use_exec=True is only supported with
target=_subprocess_main; got target={target!r}"
+ )
# Create socketpairs/"pipes" to connect to the stdin and out from the
subprocess
child_stdout, read_stdout = socketpair()
child_stderr, read_stderr = socketpair()
@@ -512,14 +584,34 @@ class WatchedSubprocess:
del logger
try:
- # Run the child entrypoint
- _fork_main(child_requests, child_stdout, child_stderr,
child_logs.fileno(), target)
+ if use_exec:
+ # macOS: exec a fresh Python interpreter to replace the
+ # inherited ObjC/CoreFoundation state that is not
fork-safe.
+ # dup2 copies the socketpairs onto FDs 0/1/2; os.dup2
clears
+ # FD_CLOEXEC on the destination FDs, so they survive exec.
+ # The log channel is requested later via ResendLoggingFD.
+ os.dup2(child_requests.fileno(), 0)
+ os.dup2(child_stdout.fileno(), 1)
+ os.dup2(child_stderr.fileno(), 2)
+ os.execv(
+ sys.executable,
+ [
+ sys.executable,
+ "-c",
+ "from airflow.sdk.execution_time.supervisor import
_child_exec_main;"
+ " _child_exec_main()",
+ ],
+ )
+ # execv replaces the process -- unreachable on success
+ else:
+ # Run the child entrypoint
+ _fork_main(child_requests, child_stdout, child_stderr,
child_logs.fileno(), target)
except BaseException as e:
import traceback
with suppress(BaseException):
- # We can't use log here, as if we except out of _fork_main
something _weird_ went on.
- print("Exception in _fork_main, exiting with code 124",
file=sys.stderr)
+ # We can't use log here, as if we except out of the child
something _weird_ went on.
+ print("Exception in child process, exiting with code 124",
file=sys.stderr)
traceback.print_exception(type(e), e, e.__traceback__,
file=sys.stderr)
# It's really super super important we never exit this block. We
are in the forked child, and if we
@@ -1007,7 +1099,13 @@ class ActivitySubprocess(WatchedSubprocess):
**kwargs,
) -> Self:
"""Fork and start a new subprocess to execute the given task."""
- proc: Self = super().start(id=what.id, client=client, target=target,
logger=logger, **kwargs)
+ # Opt in to fork+exec on platforms that need it (currently macOS).
+ # Tests override `target` with a local stub to exercise the base
+ # infrastructure; keep bare fork for those.
+ use_exec = target is _subprocess_main and sys.platform in
_FORK_EXEC_PLATFORMS
+ proc: Self = super().start(
+ id=what.id, client=client, target=target, logger=logger,
use_exec=use_exec, **kwargs
+ )
# Tell the task process what it needs to do!
proc._on_child_started(
ti=what,
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index f41d4709545..c363c40aa60 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1891,6 +1891,14 @@ def main():
try:
log.info("::group::Pre Execute")
startup_details = get_startup_details()
+
+ # On macOS fork+exec path, the structured log channel wasn't
+ # inherited (exec replaces the address space). Request it from
+ # the supervisor using the existing ResendLoggingFD mechanism.
+ # Must happen after get_startup_details() so we don't read the
+ # startup message as a ResendLoggingFD response.
+ if os.environ.pop("_AIRFLOW_FORK_EXEC", None) == "1":
+ reinit_supervisor_comms()
span = _make_task_span(msg=startup_details)
stack.enter_context(span)
ti, context, log = startup(msg=startup_details)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 85f5faf22fa..49e6a65c2e9 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -64,7 +64,7 @@ from airflow.sdk.api.datamodels._generated import (
TaskInstanceState,
)
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType,
TaskAlreadyRunningError
-from airflow.sdk.execution_time import task_runner
+from airflow.sdk.execution_time import supervisor, task_runner
from airflow.sdk.execution_time.comms import (
AssetEventsResult,
AssetResult,
@@ -3355,3 +3355,64 @@ def test_nondumpable_noop_on_non_linux():
"""On non-Linux, _make_process_nondumpable returns without error."""
_make_process_nondumpable()
+
+
[email protected]("disable_capturing")
+class TestChildExecMain:
+ """Test the macOS fork+exec child entry point."""
+
+ def test_uses_fds_012_and_requests_log_channel(self, monkeypatch):
+ """_child_exec_main wraps FDs 0/1/2 as sockets, passes log_fd=0, sets
_AIRFLOW_FORK_EXEC."""
+ # _child_exec_main expects FDs 0/1/2 to be sockets (dup2'd by the
+ # parent before exec). It passes log_fd=0 to _fork_main (structured
+ # logging is requested later via ResendLoggingFD).
+ req_a, req_b = socket.socketpair()
+ out_a, out_b = socket.socketpair()
+ err_a, err_b = socket.socketpair()
+
+ # Save originals so we can restore after the test.
+ saved_0 = os.dup(0)
+ saved_1 = os.dup(1)
+ saved_2 = os.dup(2)
+
+ try:
+ os.dup2(req_a.fileno(), 0)
+ os.dup2(out_a.fileno(), 1)
+ os.dup2(err_a.fileno(), 2)
+
+ captured = {}
+
+ def mock_fork_main(requests, stdout, stderr, log_fd, target):
+ captured["requests_fd"] = requests.fileno()
+ captured["stdout_fd"] = stdout.fileno()
+ captured["stderr_fd"] = stderr.fileno()
+ captured["log_fd"] = log_fd
+ captured["target"] = target
+ # Detach so the mock return doesn't double-close FDs that
+ # _child_exec_main's socket objects also own.
+ requests.detach()
+ stdout.detach()
+ stderr.detach()
+
+ monkeypatch.setattr(supervisor, "_fork_main", mock_fork_main)
+
+ supervisor._child_exec_main()
+
+ assert captured["requests_fd"] == 0
+ assert captured["stdout_fd"] == 1
+ assert captured["stderr_fd"] == 2
+ assert captured["log_fd"] == 0
+ assert captured["target"] is supervisor._subprocess_main
+ # _child_exec_main sets this so the task runner knows to request
+ # the log channel via ResendLoggingFD.
+ assert os.environ.pop("_AIRFLOW_FORK_EXEC") == "1"
+ finally:
+ # Restore original FDs.
+ os.dup2(saved_0, 0)
+ os.dup2(saved_1, 1)
+ os.dup2(saved_2, 2)
+ os.close(saved_0)
+ os.close(saved_1)
+ os.close(saved_2)
+ for s in [req_a, req_b, out_a, out_b, err_a, err_b]:
+ s.close()