andreahlert opened a new pull request, #61629:
URL: https://github.com/apache/airflow/pull/61629

   ## What
   
   Fixes `PythonVirtualenvOperator` (and `ExternalPythonOperator`) failing to 
access `Variable.get()`, `Connection.get()`, and XCom via the Task SDK in 
Airflow 3.x environments (particularly on Kubernetes).
   
   Closes: #58724
   
   ## Why
   
   In the normal forked execution path, the supervisor communicates with the 
task process via a Unix socketpair mapped to fd 0 (stdin). However, the 
`__AIRFLOW_SUPERVISOR_FD` environment variable is never set in this path - it 
is only set by `InProcessTestSupervisor` when using `dag.test()`.
   
   When `PythonVirtualenvOperator` launches a subprocess via 
`Popen(close_fds=False)`, while fd 0 is technically inherited, the virtualenv 
subprocess has no reliable way to know which fd carries the supervisor comms 
channel. The `reinit_supervisor_comms()` function defaults to fd 0, but this is 
fragile and breaks when:
   1. The `airflow.sdk.execution_time.task_runner` import fails with 
`ImportError` (not just `ModuleNotFoundError`) due to missing transitive 
dependencies in the virtualenv
   2. `reinit_supervisor_comms()` itself crashes (e.g., socket validation 
fails), killing the entire virtualenv script
   
   Without supervisor comms, the secrets backend falls back to 
`EnvironmentVariablesBackend` only, silently losing access to all DB-stored 
Variables and Connections.
   
   ## How
   
   Two-pronged fix:
   
   ### 1. Operator side (`python.py`)
   In `_BasePythonVirtualenvOperator._execute_python_callable_in_subprocess()`, 
added an `elif` block that:
   - Detects the current `SUPERVISOR_COMMS` socket fd from `task_runner`
   - Marks it as inheritable via `os.set_inheritable()`
   - Propagates it to the subprocess via `__AIRFLOW_SUPERVISOR_FD` env var
   
   This makes the fd propagation explicit rather than relying on implicit fd 0 
inheritance.
   
   ### 2. Template side (`python_virtualenv_script.jinja2`)
   - Broadened `except ModuleNotFoundError` to `except (ModuleNotFoundError, 
ImportError)` to handle cases where `task_runner` can be found but has failing 
transitive dependencies
   - Wrapped `reinit_supervisor_comms()` call in `try/except Exception` to 
prevent the entire virtualenv script from crashing if socket communication 
setup fails
   
   ## Testing
   
   The existing test `test_reinit_supervisor_comms` in 
`task-sdk/tests/task_sdk/execution_time/test_supervisor.py` validates the basic 
mechanism (subprocess reinits comms and fetches a connection). The fix ensures 
this mechanism is properly triggered in production by explicitly propagating 
the fd.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to