andreahlert opened a new pull request, #61629: URL: https://github.com/apache/airflow/pull/61629
## What Fixes `PythonVirtualenvOperator` (and `ExternalPythonOperator`) failing to access `Variable.get()`, `Connection.get()`, and XCom via the Task SDK in Airflow 3.x environments (particularly on Kubernetes). Closes: #58724 ## Why In the normal forked execution path, the supervisor communicates with the task process via a Unix socketpair mapped to fd 0 (stdin). However, the `__AIRFLOW_SUPERVISOR_FD` environment variable is never set in this path - it is only set by `InProcessTestSupervisor` when using `dag.test()`. When `PythonVirtualenvOperator` launches a subprocess via `Popen(close_fds=False)`, while fd 0 is technically inherited, the virtualenv subprocess has no reliable way to know which fd carries the supervisor comms channel. The `reinit_supervisor_comms()` function defaults to fd 0, but this is fragile and breaks when: 1. The `airflow.sdk.execution_time.task_runner` import fails with `ImportError` (not just `ModuleNotFoundError`) due to missing transitive dependencies in the virtualenv 2. `reinit_supervisor_comms()` itself crashes (e.g., socket validation fails), killing the entire virtualenv script Without supervisor comms, the secrets backend falls back to `EnvironmentVariablesBackend` only, silently losing access to all DB-stored Variables and Connections. ## How Two-pronged fix: ### 1. Operator side (`python.py`) In `_BasePythonVirtualenvOperator._execute_python_callable_in_subprocess()`, added an `elif` block that: - Detects the current `SUPERVISOR_COMMS` socket fd from `task_runner` - Marks it as inheritable via `os.set_inheritable()` - Propagates it to the subprocess via `__AIRFLOW_SUPERVISOR_FD` env var This makes the fd propagation explicit rather than relying on implicit fd 0 inheritance. ### 2. Template side (`python_virtualenv_script.jinja2`) - Broadened `except ModuleNotFoundError` to `except (ModuleNotFoundError, ImportError)` to handle cases where `task_runner` can be found but has failing transitive dependencies - Wrapped `reinit_supervisor_comms()` call in `try/except Exception` to prevent the entire virtualenv script from crashing if socket communication setup fails ## Testing The existing test `test_reinit_supervisor_comms` in `task-sdk/tests/task_sdk/execution_time/test_supervisor.py` validates the basic mechanism (subprocess reinits comms and fetches a connection). The fix ensures this mechanism is properly triggered in production by explicitly propagating the fd. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
