This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ac038a1aa90 Allow virtualenv code to access connections/variables and 
send logs (#58148)
ac038a1aa90 is described below

commit ac038a1aa9094bad0beb31e9636caf77a55db8e2
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Nov 14 09:48:48 2025 +0000

    Allow virtualenv code to access connections/variables and send logs (#58148)
    
    (This is the second attempt at this PR)
    
    This change is quite simple, as it simply calls the function added in #57212
    and #58147 if it's available. Up until the point that it is released this 
code
    will not hit the getattr will return None and be a no-op.
    
    I didn't use the walrus operator here as theoretically someone could be 
using
    this to run in a Python that doesn't support it.
    
    Fixes #51422, fixes #54706.
    
    This doesn't have any unit tests, as a) it's relatively simple, and b) due 
to
    how we run in the same process in the tests, there is no socket to reconnect
    to, so we can't actually exercise this code
---
 .../airflow/providers/standard/operators/python.py |  2 ++
 .../providers/standard/utils/python_virtualenv.py  |  2 +-
 .../standard/utils/python_virtualenv_script.jinja2 | 21 ++++++++++++++++++---
 .../tests/unit/standard/operators/test_python.py   | 22 ++++++++++++----------
 4 files changed, 33 insertions(+), 14 deletions(-)

diff --git 
a/providers/standard/src/airflow/providers/standard/operators/python.py 
b/providers/standard/src/airflow/providers/standard/operators/python.py
index 160617c2b78..b597ef38cc5 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -562,6 +562,8 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
             )
 
             env_vars = dict(os.environ) if self.inherit_env else {}
+            if fd := os.getenv("__AIRFLOW_SUPERVISOR_FD"):
+                env_vars["__AIRFLOW_SUPERVISOR_FD"] = fd
             if self.env_vars:
                 env_vars.update(self.env_vars)
 
diff --git 
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py 
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py
index ee71f33a560..891b3e0bce3 100644
--- 
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py
+++ 
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py
@@ -150,7 +150,7 @@ def _execute_in_subprocess(cmd: list[str], cwd: str | None 
= None, env: dict[str
         stdout=subprocess.PIPE,
         stderr=subprocess.STDOUT,
         bufsize=0,
-        close_fds=True,
+        close_fds=False,
         cwd=cwd,
         env=env,
     ) as proc:
diff --git 
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
 
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
index cb4b738138f..8cb3ace35aa 100644
--- 
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
+++ 
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
@@ -40,6 +40,23 @@ if sys.version_info >= (3,6):
         pass
 {% endif %}
 
+try:
+    from airflow.sdk.execution_time import task_runner
+except ModuleNotFoundError:
+    pass
+else:
+  {#-
+    We are in an Airflow 3.x environment, try and set up supervisor comms so
+    virtualenv can access Vars/Conn/XCom/etc that normal tasks can
+
+    We don't use the walrus operator (`:=`) below as it is possible people can
+    be using this on pre-3.8 versions of python, and while Airflow doesn't
+    support them, it's easy to not break it not using that operator here.
+  #}
+  reinit_supervisor_comms = getattr(task_runner, "reinit_supervisor_comms", 
None)
+  if reinit_supervisor_comms:
+      reinit_supervisor_comms()
+
 # Script
 {{ python_callable_source }}
 
@@ -49,12 +66,10 @@ if sys.version_info >= (3,6):
 import types
 
 {{ modified_dag_module_name }}  = types.ModuleType("{{ 
modified_dag_module_name }}")
-
 {{ modified_dag_module_name }}.{{ python_callable }} = {{ python_callable }}
-
 sys.modules["{{modified_dag_module_name}}"] = {{modified_dag_module_name}}
 
-{% endif%}
+{%- endif -%}
 
 {% if op_args or op_kwargs %}
 with open(sys.argv[1], "rb") as file:
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py 
b/providers/standard/tests/unit/standard/operators/test_python.py
index 7526b5128f5..9787b85a6c6 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -201,6 +201,7 @@ class BasePythonTest:
     def run_as_task(self, fn, return_ti=False, **kwargs):
         """Create TaskInstance and run it."""
         ti = self.create_ti(fn, **kwargs)
+        assert ti.task is not None
         ti.run()
         if return_ti:
             return ti
@@ -976,15 +977,16 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
         def f():
             return None
 
-        task = self.run_as_task(f)
-        assert task.execute_callable() is None
+        ti = self.run_as_task(f, return_ti=True)
+        assert ti.xcom_pull() is None
 
     def test_return_false(self):
         def f():
             return False
 
-        task = self.run_as_task(f)
-        assert task.execute_callable() is False
+        ti = self.run_as_task(f, return_ti=True)
+
+        assert ti.xcom_pull() is False
 
     def test_lambda(self):
         with pytest.raises(
@@ -1149,8 +1151,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
 
             return os.environ["MY_ENV_VAR"]
 
-        task = self.run_as_task(f, env_vars={"MY_ENV_VAR": "ABCDE"})
-        assert task.execute_callable() == "ABCDE"
+        ti = self.run_as_task(f, env_vars={"MY_ENV_VAR": "ABCDE"}, 
return_ti=True)
+        assert ti.xcom_pull() == "ABCDE"
 
     def test_environment_variables_with_inherit_env_true(self, monkeypatch):
         monkeypatch.setenv("MY_ENV_VAR", "QWERT")
@@ -1160,8 +1162,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
 
             return os.environ["MY_ENV_VAR"]
 
-        task = self.run_as_task(f, inherit_env=True)
-        assert task.execute_callable() == "QWERT"
+        ti = self.run_as_task(f, inherit_env=True, return_ti=True)
+        assert ti.xcom_pull() == "QWERT"
 
     def test_environment_variables_with_inherit_env_false(self, monkeypatch):
         monkeypatch.setenv("MY_ENV_VAR", "TYUIO")
@@ -1182,8 +1184,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
 
             return os.environ["MY_ENV_VAR"]
 
-        task = self.run_as_task(f, env_vars={"MY_ENV_VAR": "EFGHI"}, 
inherit_env=True)
-        assert task.execute_callable() == "EFGHI"
+        ti = self.run_as_task(f, env_vars={"MY_ENV_VAR": "EFGHI"}, 
inherit_env=True, return_ti=True)
+        assert ti.xcom_pull() == "EFGHI"
 
 
 venv_cache_path = tempfile.mkdtemp(prefix="venv_cache_path")

Reply via email to