This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2f48bc9bf8 Bugfix/prevent concurrency with cached venv (#35258)
2f48bc9bf8 is described below
commit 2f48bc9bf825f26e310267e3d798016619c67a2e
Author: Jens Scheffler <[email protected]>
AuthorDate: Mon Oct 30 12:19:31 2023 +0100
Bugfix/prevent concurrency with cached venv (#35258)
Revert #35252 and ensure venv cache files are created in a dedicated tmp
folder
---
airflow/operators/python.py | 105 ++++++++++++++++++++---------------------
tests/operators/test_python.py | 33 -------------
2 files changed, 51 insertions(+), 87 deletions(-)
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 28c3adedc7..5a4cf9c2a2 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -422,57 +422,56 @@ class _BasePythonVirtualenvOperator(PythonOperator,
metaclass=ABCMeta):
memo[id(self.pickling_library)] = self.pickling_library
return super().__deepcopy__(memo)
- def _execute_python_callable_in_subprocess(self, python_path: Path,
tmp_dir: Path):
- op_kwargs: dict[str, Any] = dict(self.op_kwargs)
- if self.templates_dict:
- op_kwargs["templates_dict"] = self.templates_dict
- input_path = tmp_dir / "script.in"
- output_path = tmp_dir / "script.out"
- string_args_path = tmp_dir / "string_args.txt"
- script_path = tmp_dir / "script.py"
- termination_log_path = tmp_dir / "termination.log"
-
- self._write_args(input_path)
- self._write_string_args(string_args_path)
- write_python_script(
- jinja_context={
- "op_args": self.op_args,
- "op_kwargs": op_kwargs,
- "expect_airflow": self.expect_airflow,
- "pickling_library": self.pickling_library.__name__,
- "python_callable": self.python_callable.__name__,
- "python_callable_source": self.get_python_source(),
- },
- filename=os.fspath(script_path),
-
render_template_as_native_obj=self.dag.render_template_as_native_obj,
- )
- # For cached venv we need to make sure that the termination log does
not exist
- # Before process starts (it could be a left-over from a previous run)
- if termination_log_path.exists():
- termination_log_path.unlink()
- try:
- execute_in_subprocess(
- cmd=[
- os.fspath(python_path),
- os.fspath(script_path),
- os.fspath(input_path),
- os.fspath(output_path),
- os.fspath(string_args_path),
- os.fspath(termination_log_path),
- ]
+ def _execute_python_callable_in_subprocess(self, python_path: Path):
+ with TemporaryDirectory(prefix="venv-call") as tmp:
+ tmp_dir = Path(tmp)
+ op_kwargs: dict[str, Any] = dict(self.op_kwargs)
+ if self.templates_dict:
+ op_kwargs["templates_dict"] = self.templates_dict
+ input_path = tmp_dir / "script.in"
+ output_path = tmp_dir / "script.out"
+ string_args_path = tmp_dir / "string_args.txt"
+ script_path = tmp_dir / "script.py"
+ termination_log_path = tmp_dir / "termination.log"
+
+ self._write_args(input_path)
+ self._write_string_args(string_args_path)
+ write_python_script(
+ jinja_context={
+ "op_args": self.op_args,
+ "op_kwargs": op_kwargs,
+ "expect_airflow": self.expect_airflow,
+ "pickling_library": self.pickling_library.__name__,
+ "python_callable": self.python_callable.__name__,
+ "python_callable_source": self.get_python_source(),
+ },
+ filename=os.fspath(script_path),
+
render_template_as_native_obj=self.dag.render_template_as_native_obj,
)
- except subprocess.CalledProcessError as e:
- if e.returncode in self.skip_on_exit_code:
- raise AirflowSkipException(f"Process exited with code
{e.returncode}. Skipping.")
- elif termination_log_path.exists() and
termination_log_path.stat().st_size > 0:
- error_msg = f"Process returned non-zero exit status
{e.returncode}.\n"
- with open(termination_log_path) as file:
- error_msg += file.read()
- raise AirflowException(error_msg) from None
- else:
- raise
- return self._read_result(output_path)
+ try:
+ execute_in_subprocess(
+ cmd=[
+ os.fspath(python_path),
+ os.fspath(script_path),
+ os.fspath(input_path),
+ os.fspath(output_path),
+ os.fspath(string_args_path),
+ os.fspath(termination_log_path),
+ ]
+ )
+ except subprocess.CalledProcessError as e:
+ if e.returncode in self.skip_on_exit_code:
+ raise AirflowSkipException(f"Process exited with code
{e.returncode}. Skipping.")
+ elif termination_log_path.exists() and
termination_log_path.stat().st_size > 0:
+ error_msg = f"Process returned non-zero exit status
{e.returncode}.\n"
+ with open(termination_log_path) as file:
+ error_msg += file.read()
+ raise AirflowException(error_msg) from None
+ else:
+ raise
+
+ return self._read_result(output_path)
def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str,
Any]:
return KeywordParameters.determine(self.python_callable, self.op_args,
context).serializing()
@@ -704,13 +703,13 @@ class
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
if self.venv_cache_path:
venv_path =
self._ensure_venv_cache_exists(Path(self.venv_cache_path))
python_path = venv_path / "bin" / "python"
- return self._execute_python_callable_in_subprocess(python_path,
venv_path)
+ return self._execute_python_callable_in_subprocess(python_path)
with TemporaryDirectory(prefix="venv") as tmp_dir:
tmp_path = Path(tmp_dir)
self._prepare_venv(tmp_path)
python_path = tmp_path / "bin" / "python"
- result = self._execute_python_callable_in_subprocess(python_path,
tmp_path)
+ result = self._execute_python_callable_in_subprocess(python_path)
return result
def _iter_serializable_context_keys(self):
@@ -848,9 +847,7 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
f"Sys version: {sys.version_info}. "
f"Virtual environment version:
{python_version_as_list_of_strings}"
)
- with TemporaryDirectory(prefix="tmd") as tmp_dir:
- tmp_path = Path(tmp_dir)
- return self._execute_python_callable_in_subprocess(python_path,
tmp_path)
+ return self._execute_python_callable_in_subprocess(python_path)
def _get_python_version_from_environment(self) -> list[str]:
try:
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index cb2fd16a45..bfb1180353 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -25,7 +25,6 @@ import sys
import warnings
from collections import namedtuple
from datetime import date, datetime, timedelta
-from pathlib import Path
from subprocess import CalledProcessError
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Generator
@@ -999,38 +998,6 @@ class
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
self.run_as_task(f, venv_cache_path=tmp_dir, op_args=[4])
- def test_no_side_effect_of_caching_and_termination_log(self):
- def termination_log(a):
- import sys
- from pathlib import Path
-
- assert "pytest_venv_1234" in sys.executable
- venv_cache_dir_name = Path(sys.executable).parent.parent.name
- raise Exception(f"Should produce termination log. Subdir =
{venv_cache_dir_name}")
-
- def no_termination_log(a):
- import sys
-
- assert "pytest_venv_1234" in sys.executable
- raise SystemExit(1)
-
- with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
- with pytest.raises(AirflowException, match="Should produce
termination log") as exc:
- self.run_as_task(termination_log, venv_cache_path=tmp_dir,
op_args=[4])
- venv_dir_cache = exc.value.args[0].split(" ")[-1]
- termination_log_path = Path(tmp_dir) / venv_dir_cache /
"termination.log"
- assert termination_log_path.exists()
- assert "Should produce termination log" in
termination_log_path.read_text()
- clear_db_runs()
-
- # termination log from previous run should not produce side
effects in another task
- # Using the same cached venv
-
- assert termination_log_path.exists()
- with pytest.raises(CalledProcessError):
- self.run_as_task(no_termination_log, venv_cache_path=tmp_dir,
op_args=[4])
- assert not termination_log_path.exists()
-
# This tests might take longer than default 60 seconds as it is
serializing a lot of
# context using dill (which is slow apparently).
@pytest.mark.execution_timeout(120)