This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ac038a1aa90 Allow virtualenv code to access connections/variables and
send logs (#58148)
ac038a1aa90 is described below
commit ac038a1aa9094bad0beb31e9636caf77a55db8e2
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Nov 14 09:48:48 2025 +0000
Allow virtualenv code to access connections/variables and send logs (#58148)
(This is the second attempt at this PR)
This change is quite simple, as it simply calls the function added in #57212
and #58147 if it's available. Up until the point that it is released this
code
will not hit the getattr will return None and be a no-op.
I didn't use the walrus operator here as theoretically someone could be
using
this to run in a Python that doesn't support it.
Fixes #51422, fixes #54706.
This doesn't have any unit tests, as a) it's relatively simple, and b) due
to
how we run in the same process in the tests, there is no socket to reconnect
to, so we can't actually exercise this code
---
.../airflow/providers/standard/operators/python.py | 2 ++
.../providers/standard/utils/python_virtualenv.py | 2 +-
.../standard/utils/python_virtualenv_script.jinja2 | 21 ++++++++++++++++++---
.../tests/unit/standard/operators/test_python.py | 22 ++++++++++++----------
4 files changed, 33 insertions(+), 14 deletions(-)
diff --git
a/providers/standard/src/airflow/providers/standard/operators/python.py
b/providers/standard/src/airflow/providers/standard/operators/python.py
index 160617c2b78..b597ef38cc5 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -562,6 +562,8 @@ class _BasePythonVirtualenvOperator(PythonOperator,
metaclass=ABCMeta):
)
env_vars = dict(os.environ) if self.inherit_env else {}
+ if fd := os.getenv("__AIRFLOW_SUPERVISOR_FD"):
+ env_vars["__AIRFLOW_SUPERVISOR_FD"] = fd
if self.env_vars:
env_vars.update(self.env_vars)
diff --git
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py
index ee71f33a560..891b3e0bce3 100644
---
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py
+++
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py
@@ -150,7 +150,7 @@ def _execute_in_subprocess(cmd: list[str], cwd: str | None
= None, env: dict[str
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
- close_fds=True,
+ close_fds=False,
cwd=cwd,
env=env,
) as proc:
diff --git
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
index cb4b738138f..8cb3ace35aa 100644
---
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
+++
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
@@ -40,6 +40,23 @@ if sys.version_info >= (3,6):
pass
{% endif %}
+try:
+ from airflow.sdk.execution_time import task_runner
+except ModuleNotFoundError:
+ pass
+else:
+ {#-
+ We are in an Airflow 3.x environment, try and set up supervisor comms so
+ virtualenv can access Vars/Conn/XCom/etc that normal tasks can
+
+ We don't use the walrus operator (`:=`) below as it is possible people can
+ be using this on pre-3.8 versions of python, and while Airflow doesn't
+ support them, it's easy to not break it not using that operator here.
+ #}
+ reinit_supervisor_comms = getattr(task_runner, "reinit_supervisor_comms",
None)
+ if reinit_supervisor_comms:
+ reinit_supervisor_comms()
+
# Script
{{ python_callable_source }}
@@ -49,12 +66,10 @@ if sys.version_info >= (3,6):
import types
{{ modified_dag_module_name }} = types.ModuleType("{{
modified_dag_module_name }}")
-
{{ modified_dag_module_name }}.{{ python_callable }} = {{ python_callable }}
-
sys.modules["{{modified_dag_module_name}}"] = {{modified_dag_module_name}}
-{% endif%}
+{%- endif -%}
{% if op_args or op_kwargs %}
with open(sys.argv[1], "rb") as file:
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py
b/providers/standard/tests/unit/standard/operators/test_python.py
index 7526b5128f5..9787b85a6c6 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -201,6 +201,7 @@ class BasePythonTest:
def run_as_task(self, fn, return_ti=False, **kwargs):
"""Create TaskInstance and run it."""
ti = self.create_ti(fn, **kwargs)
+ assert ti.task is not None
ti.run()
if return_ti:
return ti
@@ -976,15 +977,16 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
def f():
return None
- task = self.run_as_task(f)
- assert task.execute_callable() is None
+ ti = self.run_as_task(f, return_ti=True)
+ assert ti.xcom_pull() is None
def test_return_false(self):
def f():
return False
- task = self.run_as_task(f)
- assert task.execute_callable() is False
+ ti = self.run_as_task(f, return_ti=True)
+
+ assert ti.xcom_pull() is False
def test_lambda(self):
with pytest.raises(
@@ -1149,8 +1151,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
return os.environ["MY_ENV_VAR"]
- task = self.run_as_task(f, env_vars={"MY_ENV_VAR": "ABCDE"})
- assert task.execute_callable() == "ABCDE"
+ ti = self.run_as_task(f, env_vars={"MY_ENV_VAR": "ABCDE"},
return_ti=True)
+ assert ti.xcom_pull() == "ABCDE"
def test_environment_variables_with_inherit_env_true(self, monkeypatch):
monkeypatch.setenv("MY_ENV_VAR", "QWERT")
@@ -1160,8 +1162,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
return os.environ["MY_ENV_VAR"]
- task = self.run_as_task(f, inherit_env=True)
- assert task.execute_callable() == "QWERT"
+ ti = self.run_as_task(f, inherit_env=True, return_ti=True)
+ assert ti.xcom_pull() == "QWERT"
def test_environment_variables_with_inherit_env_false(self, monkeypatch):
monkeypatch.setenv("MY_ENV_VAR", "TYUIO")
@@ -1182,8 +1184,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
return os.environ["MY_ENV_VAR"]
- task = self.run_as_task(f, env_vars={"MY_ENV_VAR": "EFGHI"},
inherit_env=True)
- assert task.execute_callable() == "EFGHI"
+ ti = self.run_as_task(f, env_vars={"MY_ENV_VAR": "EFGHI"},
inherit_env=True, return_ti=True)
+ assert ti.xcom_pull() == "EFGHI"
venv_cache_path = tempfile.mkdtemp(prefix="venv_cache_path")