This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f48bc9bf8 Bugfix/prevent concurrency with cached venv (#35258)
2f48bc9bf8 is described below

commit 2f48bc9bf825f26e310267e3d798016619c67a2e
Author: Jens Scheffler <[email protected]>
AuthorDate: Mon Oct 30 12:19:31 2023 +0100

    Bugfix/prevent concurrency with cached venv (#35258)
    
    Revert #35252 and ensure venv cache files are created in a dedicated tmp 
folder
---
 airflow/operators/python.py    | 105 ++++++++++++++++++++---------------------
 tests/operators/test_python.py |  33 -------------
 2 files changed, 51 insertions(+), 87 deletions(-)

diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 28c3adedc7..5a4cf9c2a2 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -422,57 +422,56 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         memo[id(self.pickling_library)] = self.pickling_library
         return super().__deepcopy__(memo)
 
-    def _execute_python_callable_in_subprocess(self, python_path: Path, 
tmp_dir: Path):
-        op_kwargs: dict[str, Any] = dict(self.op_kwargs)
-        if self.templates_dict:
-            op_kwargs["templates_dict"] = self.templates_dict
-        input_path = tmp_dir / "script.in"
-        output_path = tmp_dir / "script.out"
-        string_args_path = tmp_dir / "string_args.txt"
-        script_path = tmp_dir / "script.py"
-        termination_log_path = tmp_dir / "termination.log"
-
-        self._write_args(input_path)
-        self._write_string_args(string_args_path)
-        write_python_script(
-            jinja_context={
-                "op_args": self.op_args,
-                "op_kwargs": op_kwargs,
-                "expect_airflow": self.expect_airflow,
-                "pickling_library": self.pickling_library.__name__,
-                "python_callable": self.python_callable.__name__,
-                "python_callable_source": self.get_python_source(),
-            },
-            filename=os.fspath(script_path),
-            
render_template_as_native_obj=self.dag.render_template_as_native_obj,
-        )
-        # For cached venv we need to make sure that the termination log does 
not exist
-        # Before process starts (it could be a left-over from a previous run)
-        if termination_log_path.exists():
-            termination_log_path.unlink()
-        try:
-            execute_in_subprocess(
-                cmd=[
-                    os.fspath(python_path),
-                    os.fspath(script_path),
-                    os.fspath(input_path),
-                    os.fspath(output_path),
-                    os.fspath(string_args_path),
-                    os.fspath(termination_log_path),
-                ]
+    def _execute_python_callable_in_subprocess(self, python_path: Path):
+        with TemporaryDirectory(prefix="venv-call") as tmp:
+            tmp_dir = Path(tmp)
+            op_kwargs: dict[str, Any] = dict(self.op_kwargs)
+            if self.templates_dict:
+                op_kwargs["templates_dict"] = self.templates_dict
+            input_path = tmp_dir / "script.in"
+            output_path = tmp_dir / "script.out"
+            string_args_path = tmp_dir / "string_args.txt"
+            script_path = tmp_dir / "script.py"
+            termination_log_path = tmp_dir / "termination.log"
+
+            self._write_args(input_path)
+            self._write_string_args(string_args_path)
+            write_python_script(
+                jinja_context={
+                    "op_args": self.op_args,
+                    "op_kwargs": op_kwargs,
+                    "expect_airflow": self.expect_airflow,
+                    "pickling_library": self.pickling_library.__name__,
+                    "python_callable": self.python_callable.__name__,
+                    "python_callable_source": self.get_python_source(),
+                },
+                filename=os.fspath(script_path),
+                
render_template_as_native_obj=self.dag.render_template_as_native_obj,
             )
-        except subprocess.CalledProcessError as e:
-            if e.returncode in self.skip_on_exit_code:
-                raise AirflowSkipException(f"Process exited with code 
{e.returncode}. Skipping.")
-            elif termination_log_path.exists() and 
termination_log_path.stat().st_size > 0:
-                error_msg = f"Process returned non-zero exit status 
{e.returncode}.\n"
-                with open(termination_log_path) as file:
-                    error_msg += file.read()
-                raise AirflowException(error_msg) from None
-            else:
-                raise
 
-        return self._read_result(output_path)
+            try:
+                execute_in_subprocess(
+                    cmd=[
+                        os.fspath(python_path),
+                        os.fspath(script_path),
+                        os.fspath(input_path),
+                        os.fspath(output_path),
+                        os.fspath(string_args_path),
+                        os.fspath(termination_log_path),
+                    ]
+                )
+            except subprocess.CalledProcessError as e:
+                if e.returncode in self.skip_on_exit_code:
+                    raise AirflowSkipException(f"Process exited with code 
{e.returncode}. Skipping.")
+                elif termination_log_path.exists() and 
termination_log_path.stat().st_size > 0:
+                    error_msg = f"Process returned non-zero exit status 
{e.returncode}.\n"
+                    with open(termination_log_path) as file:
+                        error_msg += file.read()
+                    raise AirflowException(error_msg) from None
+                else:
+                    raise
+
+            return self._read_result(output_path)
 
     def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, 
Any]:
         return KeywordParameters.determine(self.python_callable, self.op_args, 
context).serializing()
@@ -704,13 +703,13 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         if self.venv_cache_path:
             venv_path = 
self._ensure_venv_cache_exists(Path(self.venv_cache_path))
             python_path = venv_path / "bin" / "python"
-            return self._execute_python_callable_in_subprocess(python_path, 
venv_path)
+            return self._execute_python_callable_in_subprocess(python_path)
 
         with TemporaryDirectory(prefix="venv") as tmp_dir:
             tmp_path = Path(tmp_dir)
             self._prepare_venv(tmp_path)
             python_path = tmp_path / "bin" / "python"
-            result = self._execute_python_callable_in_subprocess(python_path, 
tmp_path)
+            result = self._execute_python_callable_in_subprocess(python_path)
             return result
 
     def _iter_serializable_context_keys(self):
@@ -848,9 +847,7 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
                 f"Sys version: {sys.version_info}. "
                 f"Virtual environment version: 
{python_version_as_list_of_strings}"
             )
-        with TemporaryDirectory(prefix="tmd") as tmp_dir:
-            tmp_path = Path(tmp_dir)
-            return self._execute_python_callable_in_subprocess(python_path, 
tmp_path)
+        return self._execute_python_callable_in_subprocess(python_path)
 
     def _get_python_version_from_environment(self) -> list[str]:
         try:
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index cb2fd16a45..bfb1180353 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -25,7 +25,6 @@ import sys
 import warnings
 from collections import namedtuple
 from datetime import date, datetime, timedelta
-from pathlib import Path
 from subprocess import CalledProcessError
 from tempfile import TemporaryDirectory
 from typing import TYPE_CHECKING, Generator
@@ -999,38 +998,6 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
             self.run_as_task(f, venv_cache_path=tmp_dir, op_args=[4])
 
-    def test_no_side_effect_of_caching_and_termination_log(self):
-        def termination_log(a):
-            import sys
-            from pathlib import Path
-
-            assert "pytest_venv_1234" in sys.executable
-            venv_cache_dir_name = Path(sys.executable).parent.parent.name
-            raise Exception(f"Should produce termination log. Subdir = 
{venv_cache_dir_name}")
-
-        def no_termination_log(a):
-            import sys
-
-            assert "pytest_venv_1234" in sys.executable
-            raise SystemExit(1)
-
-        with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
-            with pytest.raises(AirflowException, match="Should produce 
termination log") as exc:
-                self.run_as_task(termination_log, venv_cache_path=tmp_dir, 
op_args=[4])
-            venv_dir_cache = exc.value.args[0].split(" ")[-1]
-            termination_log_path = Path(tmp_dir) / venv_dir_cache / 
"termination.log"
-            assert termination_log_path.exists()
-            assert "Should produce termination log" in 
termination_log_path.read_text()
-            clear_db_runs()
-
-            # termination log from previous run should not produce side 
effects in another task
-            # Using the same cached venv
-
-            assert termination_log_path.exists()
-            with pytest.raises(CalledProcessError):
-                self.run_as_task(no_termination_log, venv_cache_path=tmp_dir, 
op_args=[4])
-            assert not termination_log_path.exists()
-
     # This tests might take longer than default 60 seconds as it is 
serializing a lot of
     # context using dill (which is slow apparently).
     @pytest.mark.execution_timeout(120)

Reply via email to