kaxil commented on code in PR #64874:
URL: https://github.com/apache/airflow/pull/64874#discussion_r3060091816


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -441,6 +449,56 @@ def exit(n: int) -> NoReturn:
             exit(125)
 
 
+_USE_FORK_EXEC = sys.platform == "darwin"
+"""On macOS, use fork + exec (``os.fork`` then ``os.execv``) instead of bare 
``os.fork``.
+
+macOS system libraries (Security.framework, CoreFoundation, ``_scproxy``) use
+Objective-C, which is not fork-safe.  A bare ``os.fork()`` copies the parent's
+ObjC runtime state; if the child then triggers ObjC class initialization
+(e.g. via ``socket.getaddrinfo`` -> system DNS resolver -> proxy lookup), the
+runtime detects the corrupted state and crashes with SIGABRT.
+
+Calling ``os.execv`` immediately after ``os.fork`` replaces the child's address
+space, giving it clean ObjC state.  The socketpair FDs survive across exec
+because they are marked inheritable; the child reads FD numbers from an env var
+and reconstructs the communication channels.
+
+This applies to all executors (Local, Celery, etc.) but only on macOS and only
+for task execution (``target is _subprocess_main``).  DAG processor and 
triggerer
+use different targets and keep bare fork -- they don't make network calls that
+trigger the macOS crash.
+
+See: https://github.com/python/cpython/issues/105912
+     https://github.com/apache/airflow/discussions/24463
+"""
+
+# Env var key used to pass socket FDs to the child when using fork+exec.
+_CHILD_FDS_ENV_VAR = "_AIRFLOW_SUPERVISOR_CHILD_FDS"
+
+
+def _child_exec_main():
+    """
+    Entry point for the child process when using fork+exec (macOS).
+
+    Reads socket FD numbers from the environment, reconstructs the sockets,
+    and hands off to ``_fork_main`` -- the same code path as the bare-fork
+    child.
+    """
+    import json
+    import socket as _socket
+
+    fds = json.loads(os.environ.pop(_CHILD_FDS_ENV_VAR))
+    child_requests = _socket.socket(fileno=fds["requests"])
+    child_stdout = _socket.socket(fileno=fds["stdout"])
+    child_stderr = _socket.socket(fileno=fds["stderr"])
+    log_fd = fds["logs"]

Review Comment:
   Good call. Pushed 87883db -- now dup2s onto 0/1/2 before exec (no 
set_inheritable needed for those), and the log channel uses the existing 
`ResendLoggingFD` + `reinit_supervisor_comms()` mechanism rather than a new env 
var. The exec'd child starts with `log_fd=0` (structured logging skipped), sets 
`_AIRFLOW_FORK_EXEC=1`, and `main()` in task_runner.py calls 
`reinit_supervisor_comms()` after `get_startup_details()` to request the log 
channel. Same flow as the sudo/virtualenv re-exec path.



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,49 @@ def start(
             del constructor_kwargs
             del logger
 
-            try:
-                # Run the child entrypoint
-                _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
-            except BaseException as e:
-                import traceback
-
-                with suppress(BaseException):
-                    # We can't use log here, as if we except out of _fork_main 
something _weird_ went on.
-                    print("Exception in _fork_main, exiting with code 124", 
file=sys.stderr)
-                    traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            if _USE_FORK_EXEC and target is _subprocess_main:
+                # macOS: immediately exec a fresh Python interpreter to 
replace the
+                # inherited ObjC/CoreFoundation state that is not fork-safe.  
Only
+                # for task execution (_subprocess_main); DAG processor and 
triggerer
+                # use different targets and keep bare fork.  The socketpair FDs
+                # survive across exec because we mark them inheritable.
+                try:
+                    import json
+
+                    fds = {
+                        "requests": child_requests.fileno(),
+                        "stdout": child_stdout.fileno(),
+                        "stderr": child_stderr.fileno(),
+                        "logs": child_logs.fileno(),
+                    }
+                    for fd in fds.values():
+                        os.set_inheritable(fd, True)
+
+                    os.environ[_CHILD_FDS_ENV_VAR] = json.dumps(fds)
+                    os.execv(sys.executable, [
+                        sys.executable,
+                        "-c",
+                        "from airflow.sdk.execution_time.supervisor import 
_child_exec_main;"
+                        " _child_exec_main()",
+                    ])
+                    # execv replaces the process -- we never reach here
+                except BaseException as e:
+                    import traceback
+
+                    with suppress(BaseException):
+                        print(f"execv failed, exiting with code 124: {e}", 
file=sys.stderr)
+                        traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            else:

Review Comment:
   The else is I think actually needed here. `os.execv()` is inside a 
`try/except BaseException`, so if execv fails, the except prints the error and 
falls through. Without the `else` we'd also run `_fork_main` on a half-broken 
macOS child. Could move `os._exit(124)` into the except block so the if-branch 
always terminates, then drop the else and un-indent. lmk if you prefer that.



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,49 @@ def start(
             del constructor_kwargs
             del logger
 
-            try:
-                # Run the child entrypoint
-                _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
-            except BaseException as e:
-                import traceback
-
-                with suppress(BaseException):
-                    # We can't use log here, as if we except out of _fork_main 
something _weird_ went on.
-                    print("Exception in _fork_main, exiting with code 124", 
file=sys.stderr)
-                    traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            if _USE_FORK_EXEC and target is _subprocess_main:

Review Comment:
   Yeah fair. Only task execution runs user code that makes network calls 
(HTTP/DNS), DAG processor and triggerer don't hit the ObjC crash. A `use_exec: 
bool = False` kwarg on `start()` would be cleaner, lets callers opt in 
explicitly. Want me to switch to that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to