Copilot commented on code in PR #64874:
URL: https://github.com/apache/airflow/pull/64874#discussion_r3066479580


##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -3297,3 +3297,63 @@ def test_nondumpable_noop_on_non_linux():
     """On non-Linux, _make_process_nondumpable returns without error."""
 
     _make_process_nondumpable()
+
+
+class TestChildExecMain:
+    """Test the macOS fork+exec child entry point."""
+
+    def test_uses_fds_012_and_requests_log_channel(self, monkeypatch):
+        """_child_exec_main wraps FDs 0/1/2 as sockets, passes log_fd=0, sets 
_AIRFLOW_FORK_EXEC."""
+        # _child_exec_main expects FDs 0/1/2 to be sockets (dup2'd by the
+        # parent before exec).  It passes log_fd=0 to _fork_main (structured
+        # logging is requested later via ResendLoggingFD).
+        req_a, req_b = socket.socketpair()
+        out_a, out_b = socket.socketpair()
+        err_a, err_b = socket.socketpair()
+
+        # Save originals so we can restore after the test.
+        saved_0 = os.dup(0)
+        saved_1 = os.dup(1)
+        saved_2 = os.dup(2)
+
+        try:
+            os.dup2(req_a.fileno(), 0)
+            os.dup2(out_a.fileno(), 1)
+            os.dup2(err_a.fileno(), 2)

Review Comment:
   This test relies on POSIX semantics where sockets are file descriptors that 
can be `dup2`’d onto 0/1/2. On Windows, sockets are not regular CRT file 
descriptors, and `os.dup2(socket.fileno(), 0)` is likely to fail. Add an 
explicit platform guard (e.g., `pytest.mark.skipif(os.name == 'nt', ...)`) or 
restructure the test to avoid `dup2` so it doesn’t introduce cross-platform CI 
failures.



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,46 @@ def start(
             del constructor_kwargs
             del logger
 
-            try:
-                # Run the child entrypoint
-                _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
-            except BaseException as e:
-                import traceback
-
-                with suppress(BaseException):
-                    # We can't use log here, as if we except out of _fork_main 
something _weird_ went on.
-                    print("Exception in _fork_main, exiting with code 124", 
file=sys.stderr)
-                    traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            if _USE_FORK_EXEC and target is _subprocess_main:
+                # macOS: immediately exec a fresh Python interpreter to 
replace the
+                # inherited ObjC/CoreFoundation state that is not fork-safe.  
Only
+                # for task execution (_subprocess_main); DAG processor and 
triggerer
+                # use different targets and keep bare fork.
+                #
+                # dup2 the socketpairs onto FDs 0/1/2 (which are inheritable by
+                # default and survive across exec).  Only the log FD needs to 
be

Review Comment:
   The comment is misleading on modern Python: per PEP 446, newly created FDs 
(including sockets) are typically non-inheritable by default, and whether 0/1/2 
survive `exec` depends on the `FD_CLOEXEC` flag. What actually makes this safe 
is `os.dup2(..., inheritable=True)` (the default), which clears `CLOEXEC` on 
the target fd. Consider updating the comment/docstring to reflect that `dup2` 
is what ensures the FDs survive `exec`.
   ```suggestion
                   # dup2 the socketpairs onto FDs 0/1/2. These duplicated 
target
                   # FDs survive the upcoming exec because os.dup2 clears 
CLOEXEC
                   # on the destination FDs by default. Only the log FD needs 
to be
   ```



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,46 @@ def start(
             del constructor_kwargs
             del logger
 
-            try:
-                # Run the child entrypoint
-                _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
-            except BaseException as e:
-                import traceback
-
-                with suppress(BaseException):
-                    # We can't use log here, as if we except out of _fork_main 
something _weird_ went on.
-                    print("Exception in _fork_main, exiting with code 124", 
file=sys.stderr)
-                    traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            if _USE_FORK_EXEC and target is _subprocess_main:
+                # macOS: immediately exec a fresh Python interpreter to 
replace the
+                # inherited ObjC/CoreFoundation state that is not fork-safe.  
Only
+                # for task execution (_subprocess_main); DAG processor and 
triggerer
+                # use different targets and keep bare fork.
+                #
+                # dup2 the socketpairs onto FDs 0/1/2 (which are inheritable by
+                # default and survive across exec).  Only the log FD needs to 
be
+                # passed explicitly.

Review Comment:
   The new macOS-only `fork+exec` branch in `WatchedSubprocess.start()` is not 
directly exercised by tests. Consider adding a unit test that forces this 
branch (e.g., monkeypatch `_USE_FORK_EXEC=True` and `target=_subprocess_main`) 
and monkeypatches `os.execv` to a stub that captures arguments and then raises 
a controlled exception; the test can then assert the expected `execv` 
invocation and (once addressed) that the child terminates via the intended exit 
path.



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,46 @@ def start(
             del constructor_kwargs
             del logger
 
-            try:
-                # Run the child entrypoint
-                _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
-            except BaseException as e:
-                import traceback
-
-                with suppress(BaseException):
-                    # We can't use log here, as if we except out of _fork_main 
something _weird_ went on.
-                    print("Exception in _fork_main, exiting with code 124", 
file=sys.stderr)
-                    traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            if _USE_FORK_EXEC and target is _subprocess_main:
+                # macOS: immediately exec a fresh Python interpreter to 
replace the
+                # inherited ObjC/CoreFoundation state that is not fork-safe.  
Only
+                # for task execution (_subprocess_main); DAG processor and 
triggerer
+                # use different targets and keep bare fork.
+                #
+                # dup2 the socketpairs onto FDs 0/1/2 (which are inheritable by
+                # default and survive across exec).  Only the log FD needs to 
be
+                # passed explicitly.
+                try:
+                    os.dup2(child_requests.fileno(), 0)
+                    os.dup2(child_stdout.fileno(), 1)
+                    os.dup2(child_stderr.fileno(), 2)
+
+                    # Log channel FD is NOT passed to the child. The task
+                    # runner will request it via ResendLoggingFD after startup.
+                    os.execv(sys.executable, [
+                        sys.executable,
+                        "-c",
+                        "from airflow.sdk.execution_time.supervisor import 
_child_exec_main;"
+                        " _child_exec_main()",
+                    ])
+                    # execv replaces the process -- we never reach here
+                except BaseException as e:
+                    import traceback
+
+                    with suppress(BaseException):
+                        print(f"execv failed, exiting with code 124: {e}", 
file=sys.stderr)
+                        traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            else:
+                try:
+                    # Run the child entrypoint
+                    _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
+                except BaseException as e:
+                    import traceback
+
+                    with suppress(BaseException):
+                        # We can't use log here, as if we except out of 
_fork_main something _weird_ went on.
+                        print("Exception in _fork_main, exiting with code 
124", file=sys.stderr)
+                        traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)

Review Comment:
   In the forked child process, both exception handlers log an error but then 
continue executing. This violates the invariant noted later in the function 
(“never exit this block… THINGS GET WEIRD”) and is particularly likely on 
`execv` failure. After logging, the child should unconditionally terminate 
(e.g., `os._exit(124)`), and ideally close/cleanup any relevant FDs before 
exiting to avoid confusing the parent/supervisor.



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -511,16 +569,46 @@ def start(
             del constructor_kwargs
             del logger
 
-            try:
-                # Run the child entrypoint
-                _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
-            except BaseException as e:
-                import traceback
-
-                with suppress(BaseException):
-                    # We can't use log here, as if we except out of _fork_main 
something _weird_ went on.
-                    print("Exception in _fork_main, exiting with code 124", 
file=sys.stderr)
-                    traceback.print_exception(type(e), e, e.__traceback__, 
file=sys.stderr)
+            if _USE_FORK_EXEC and target is _subprocess_main:
+                # macOS: immediately exec a fresh Python interpreter to 
replace the
+                # inherited ObjC/CoreFoundation state that is not fork-safe.  
Only
+                # for task execution (_subprocess_main); DAG processor and 
triggerer
+                # use different targets and keep bare fork.
+                #
+                # dup2 the socketpairs onto FDs 0/1/2 (which are inheritable by
+                # default and survive across exec).  Only the log FD needs to 
be
+                # passed explicitly.
+                try:
+                    os.dup2(child_requests.fileno(), 0)
+                    os.dup2(child_stdout.fileno(), 1)
+                    os.dup2(child_stderr.fileno(), 2)
+
+                    # Log channel FD is NOT passed to the child. The task
+                    # runner will request it via ResendLoggingFD after startup.
+                    os.execv(sys.executable, [
+                        sys.executable,
+                        "-c",
+                        "from airflow.sdk.execution_time.supervisor import 
_child_exec_main;"
+                        " _child_exec_main()",
+                    ])

Review Comment:
   The PR description says the child reads FD numbers from 
`_AIRFLOW_SUPERVISOR_CHILD_FDS`, but the implementation hard-codes 
requests/stdout/stderr to FDs 0/1/2 and uses `_AIRFLOW_FORK_EXEC` to trigger 
comms re-init. Please either update the PR description to match the implemented 
mechanism, or implement the environment-variable-based FD passing described in 
the summary.



##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -3297,3 +3297,63 @@ def test_nondumpable_noop_on_non_linux():
     """On non-Linux, _make_process_nondumpable returns without error."""
 
     _make_process_nondumpable()
+
+
+class TestChildExecMain:
+    """Test the macOS fork+exec child entry point."""
+
+    def test_uses_fds_012_and_requests_log_channel(self, monkeypatch):
+        """_child_exec_main wraps FDs 0/1/2 as sockets, passes log_fd=0, sets 
_AIRFLOW_FORK_EXEC."""
+        # _child_exec_main expects FDs 0/1/2 to be sockets (dup2'd by the
+        # parent before exec).  It passes log_fd=0 to _fork_main (structured
+        # logging is requested later via ResendLoggingFD).
+        req_a, req_b = socket.socketpair()
+        out_a, out_b = socket.socketpair()
+        err_a, err_b = socket.socketpair()

Review Comment:
   This test relies on POSIX semantics where sockets are file descriptors that 
can be `dup2`’d onto 0/1/2. On Windows, sockets are not regular CRT file 
descriptors, and `os.dup2(socket.fileno(), 0)` is likely to fail. Add an 
explicit platform guard (e.g., `pytest.mark.skipif(os.name == 'nt', ...)`) or 
restructure the test to avoid `dup2` so it doesn’t introduce cross-platform CI 
failures.



##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -3297,3 +3297,63 @@ def test_nondumpable_noop_on_non_linux():
     """On non-Linux, _make_process_nondumpable returns without error."""
 
     _make_process_nondumpable()
+
+
+class TestChildExecMain:
+    """Test the macOS fork+exec child entry point."""
+

Review Comment:
   This test relies on POSIX semantics where sockets are file descriptors that 
can be `dup2`’d onto 0/1/2. On Windows, sockets are not regular CRT file 
descriptors, and `os.dup2(socket.fileno(), 0)` is likely to fail. Add an 
explicit platform guard (e.g., `pytest.mark.skipif(os.name == 'nt', ...)`) or 
restructure the test to avoid `dup2` so it doesn’t introduce cross-platform CI 
failures.
   ```suggestion
   
       @pytest.mark.skipif(
           os.name == "nt",
           reason="Requires POSIX file descriptor semantics for dup2() onto 
stdio with sockets",
       )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to