Copilot commented on code in PR #64874:
URL: https://github.com/apache/airflow/pull/64874#discussion_r3066479580
##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -3297,3 +3297,63 @@ def test_nondumpable_noop_on_non_linux():
"""On non-Linux, _make_process_nondumpable returns without error."""
_make_process_nondumpable()
+
+
+class TestChildExecMain:
+ """Test the macOS fork+exec child entry point."""
+
+ def test_uses_fds_012_and_requests_log_channel(self, monkeypatch):
+ """_child_exec_main wraps FDs 0/1/2 as sockets, passes log_fd=0, sets
_AIRFLOW_FORK_EXEC."""
+ # _child_exec_main expects FDs 0/1/2 to be sockets (dup2'd by the
+ # parent before exec). It passes log_fd=0 to _fork_main (structured
+ # logging is requested later via ResendLoggingFD).
+ req_a, req_b = socket.socketpair()
+ out_a, out_b = socket.socketpair()
+ err_a, err_b = socket.socketpair()
+
+ # Save originals so we can restore after the test.
+ saved_0 = os.dup(0)
+ saved_1 = os.dup(1)
+ saved_2 = os.dup(2)
+
+ try:
+ os.dup2(req_a.fileno(), 0)
+ os.dup2(out_a.fileno(), 1)
+ os.dup2(err_a.fileno(), 2)
Review Comment:
This test relies on POSIX semantics where sockets are file descriptors that
can be `dup2`’d onto 0/1/2. On Windows, sockets are not regular CRT file
descriptors, and `os.dup2(socket.fileno(), 0)` is likely to fail. Add an
explicit platform guard (e.g., `pytest.mark.skipif(os.name == 'nt', ...)`) or
restructure the test to avoid `dup2` so it doesn’t introduce cross-platform CI
failures.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,46 @@ def start(
del constructor_kwargs
del logger
- try:
- # Run the child entrypoint
- _fork_main(child_requests, child_stdout, child_stderr,
child_logs.fileno(), target)
- except BaseException as e:
- import traceback
-
- with suppress(BaseException):
- # We can't use log here, as if we except out of _fork_main
something _weird_ went on.
- print("Exception in _fork_main, exiting with code 124",
file=sys.stderr)
- traceback.print_exception(type(e), e, e.__traceback__,
file=sys.stderr)
+ if _USE_FORK_EXEC and target is _subprocess_main:
+ # macOS: immediately exec a fresh Python interpreter to
replace the
+ # inherited ObjC/CoreFoundation state that is not fork-safe.
Only
+ # for task execution (_subprocess_main); DAG processor and
triggerer
+ # use different targets and keep bare fork.
+ #
+ # dup2 the socketpairs onto FDs 0/1/2 (which are inheritable by
+ # default and survive across exec). Only the log FD needs to
be
Review Comment:
The comment is misleading on modern Python: per PEP 446, newly created FDs
(including sockets) are typically non-inheritable by default, and whether 0/1/2
survive `exec` depends on the `FD_CLOEXEC` flag. What actually makes this safe
is `os.dup2(..., inheritable=True)` (the default), which clears `CLOEXEC` on
the target fd. Consider updating the comment/docstring to reflect that `dup2`
is what ensures the FDs survive `exec`.
```suggestion
# dup2 the socketpairs onto FDs 0/1/2. These duplicated
target
# FDs survive the upcoming exec because os.dup2 clears
CLOEXEC
# on the destination FDs by default. Only the log FD needs
to be
```
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,46 @@ def start(
del constructor_kwargs
del logger
- try:
- # Run the child entrypoint
- _fork_main(child_requests, child_stdout, child_stderr,
child_logs.fileno(), target)
- except BaseException as e:
- import traceback
-
- with suppress(BaseException):
- # We can't use log here, as if we except out of _fork_main
something _weird_ went on.
- print("Exception in _fork_main, exiting with code 124",
file=sys.stderr)
- traceback.print_exception(type(e), e, e.__traceback__,
file=sys.stderr)
+ if _USE_FORK_EXEC and target is _subprocess_main:
+ # macOS: immediately exec a fresh Python interpreter to
replace the
+ # inherited ObjC/CoreFoundation state that is not fork-safe.
Only
+ # for task execution (_subprocess_main); DAG processor and
triggerer
+ # use different targets and keep bare fork.
+ #
+ # dup2 the socketpairs onto FDs 0/1/2 (which are inheritable by
+ # default and survive across exec). Only the log FD needs to
be
+ # passed explicitly.
Review Comment:
The new macOS-only `fork+exec` branch in `WatchedSubprocess.start()` is not
directly exercised by tests. Consider adding a unit test that forces this
branch (e.g., monkeypatch `_USE_FORK_EXEC=True` and `target=_subprocess_main`)
and monkeypatches `os.execv` to a stub that captures arguments and then raises
a controlled exception; the test can then assert the expected `execv`
invocation and (once addressed) that the child terminates via the intended exit
path.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,46 @@ def start(
del constructor_kwargs
del logger
- try:
- # Run the child entrypoint
- _fork_main(child_requests, child_stdout, child_stderr,
child_logs.fileno(), target)
- except BaseException as e:
- import traceback
-
- with suppress(BaseException):
- # We can't use log here, as if we except out of _fork_main
something _weird_ went on.
- print("Exception in _fork_main, exiting with code 124",
file=sys.stderr)
- traceback.print_exception(type(e), e, e.__traceback__,
file=sys.stderr)
+ if _USE_FORK_EXEC and target is _subprocess_main:
+ # macOS: immediately exec a fresh Python interpreter to
replace the
+ # inherited ObjC/CoreFoundation state that is not fork-safe.
Only
+ # for task execution (_subprocess_main); DAG processor and
triggerer
+ # use different targets and keep bare fork.
+ #
+ # dup2 the socketpairs onto FDs 0/1/2 (which are inheritable by
+ # default and survive across exec). Only the log FD needs to
be
+ # passed explicitly.
+ try:
+ os.dup2(child_requests.fileno(), 0)
+ os.dup2(child_stdout.fileno(), 1)
+ os.dup2(child_stderr.fileno(), 2)
+
+ # Log channel FD is NOT passed to the child. The task
+ # runner will request it via ResendLoggingFD after startup.
+ os.execv(sys.executable, [
+ sys.executable,
+ "-c",
+ "from airflow.sdk.execution_time.supervisor import
_child_exec_main;"
+ " _child_exec_main()",
+ ])
+ # execv replaces the process -- we never reach here
+ except BaseException as e:
+ import traceback
+
+ with suppress(BaseException):
+ print(f"execv failed, exiting with code 124: {e}",
file=sys.stderr)
+ traceback.print_exception(type(e), e, e.__traceback__,
file=sys.stderr)
+ else:
+ try:
+ # Run the child entrypoint
+ _fork_main(child_requests, child_stdout, child_stderr,
child_logs.fileno(), target)
+ except BaseException as e:
+ import traceback
+
+ with suppress(BaseException):
+ # We can't use log here, as if we except out of
_fork_main something _weird_ went on.
+ print("Exception in _fork_main, exiting with code
124", file=sys.stderr)
+ traceback.print_exception(type(e), e, e.__traceback__,
file=sys.stderr)
Review Comment:
In the forked child process, both exception handlers log an error but then
continue executing. This violates the invariant noted later in the function
(“never exit this block… THINGS GET WEIRD”) and is particularly likely on
`execv` failure. After logging, the child should unconditionally terminate
(e.g., `os._exit(124)`), and ideally close/cleanup any relevant FDs before
exiting to avoid confusing the parent/supervisor.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,46 @@ def start(
del constructor_kwargs
del logger
- try:
- # Run the child entrypoint
- _fork_main(child_requests, child_stdout, child_stderr,
child_logs.fileno(), target)
- except BaseException as e:
- import traceback
-
- with suppress(BaseException):
- # We can't use log here, as if we except out of _fork_main
something _weird_ went on.
- print("Exception in _fork_main, exiting with code 124",
file=sys.stderr)
- traceback.print_exception(type(e), e, e.__traceback__,
file=sys.stderr)
+ if _USE_FORK_EXEC and target is _subprocess_main:
+ # macOS: immediately exec a fresh Python interpreter to
replace the
+ # inherited ObjC/CoreFoundation state that is not fork-safe.
Only
+ # for task execution (_subprocess_main); DAG processor and
triggerer
+ # use different targets and keep bare fork.
+ #
+ # dup2 the socketpairs onto FDs 0/1/2 (which are inheritable by
+ # default and survive across exec). Only the log FD needs to
be
+ # passed explicitly.
+ try:
+ os.dup2(child_requests.fileno(), 0)
+ os.dup2(child_stdout.fileno(), 1)
+ os.dup2(child_stderr.fileno(), 2)
+
+ # Log channel FD is NOT passed to the child. The task
+ # runner will request it via ResendLoggingFD after startup.
+ os.execv(sys.executable, [
+ sys.executable,
+ "-c",
+ "from airflow.sdk.execution_time.supervisor import
_child_exec_main;"
+ " _child_exec_main()",
+ ])
Review Comment:
The PR description says the child reads FD numbers from
`_AIRFLOW_SUPERVISOR_CHILD_FDS`, but the implementation hard-codes
requests/stdout/stderr to FDs 0/1/2 and uses `_AIRFLOW_FORK_EXEC` to trigger
comms re-init. Please either update the PR description to match the implemented
mechanism, or implement the environment-variable-based FD passing described in
the summary.
##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -3297,3 +3297,63 @@ def test_nondumpable_noop_on_non_linux():
"""On non-Linux, _make_process_nondumpable returns without error."""
_make_process_nondumpable()
+
+
+class TestChildExecMain:
+ """Test the macOS fork+exec child entry point."""
+
+ def test_uses_fds_012_and_requests_log_channel(self, monkeypatch):
+ """_child_exec_main wraps FDs 0/1/2 as sockets, passes log_fd=0, sets
_AIRFLOW_FORK_EXEC."""
+ # _child_exec_main expects FDs 0/1/2 to be sockets (dup2'd by the
+ # parent before exec). It passes log_fd=0 to _fork_main (structured
+ # logging is requested later via ResendLoggingFD).
+ req_a, req_b = socket.socketpair()
+ out_a, out_b = socket.socketpair()
+ err_a, err_b = socket.socketpair()
Review Comment:
This test relies on POSIX semantics where sockets are file descriptors that
can be `dup2`’d onto 0/1/2. On Windows, sockets are not regular CRT file
descriptors, and `os.dup2(socket.fileno(), 0)` is likely to fail. Add an
explicit platform guard (e.g., `pytest.mark.skipif(os.name == 'nt', ...)`) or
restructure the test to avoid `dup2` so it doesn’t introduce cross-platform CI
failures.
##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -3297,3 +3297,63 @@ def test_nondumpable_noop_on_non_linux():
"""On non-Linux, _make_process_nondumpable returns without error."""
_make_process_nondumpable()
+
+
+class TestChildExecMain:
+ """Test the macOS fork+exec child entry point."""
+
Review Comment:
This test relies on POSIX semantics where sockets are file descriptors that
can be `dup2`’d onto 0/1/2. On Windows, sockets are not regular CRT file
descriptors, and `os.dup2(socket.fileno(), 0)` is likely to fail. Add an
explicit platform guard (e.g., `pytest.mark.skipif(os.name == 'nt', ...)`) or
restructure the test to avoid `dup2` so it doesn’t introduce cross-platform CI
failures.
```suggestion
@pytest.mark.skipif(
os.name == "nt",
reason="Requires POSIX file descriptor semantics for dup2() onto
stdio with sockets",
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]