This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c70f298ec3 Handle left-over termination logs from previous runs with 
same cache (#35252)
c70f298ec3 is described below

commit c70f298ec3ae65f510ea5b48c6568b1734b58c2d
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Oct 29 14:58:14 2023 +0100

    Handle left-over termination logs from previous runs with same cache 
(#35252)
    
    When PythonVirtualEnv and friends use cached venv, script and
    termination logs are stored in the venv. While this was fine for scripts
    because the env is locked and scripts were overwritten every time
    venv started - it was not the same for termination log.
    
    The termination log remains in venv after venv completes (which is
    good for diagnostics) however when subsequent task failed without
    raising regular exception but just by sys.exit(), the termination
    log from previous task was found and such task would return
    AirflowException instead of CalledProcessError with the message
    coming from the previous task.
    
    This PR fixes it by deleting the termination log at the start of
    PythonVirtualenv task, when such termination log is present in
    the venv.
---
 airflow/operators/python.py    |  6 +++++-
 tests/operators/test_python.py | 33 +++++++++++++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 0eadb5441a..28c3adedc7 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -431,6 +431,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         string_args_path = tmp_dir / "string_args.txt"
         script_path = tmp_dir / "script.py"
         termination_log_path = tmp_dir / "termination.log"
+
         self._write_args(input_path)
         self._write_string_args(string_args_path)
         write_python_script(
@@ -445,7 +446,10 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
             filename=os.fspath(script_path),
             
render_template_as_native_obj=self.dag.render_template_as_native_obj,
         )
-
+        # For cached venv we need to make sure that the termination log does 
not exist
+        # Before process starts (it could be a left-over from a previous run)
+        if termination_log_path.exists():
+            termination_log_path.unlink()
         try:
             execute_in_subprocess(
                 cmd=[
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index bfb1180353..cb2fd16a45 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -25,6 +25,7 @@ import sys
 import warnings
 from collections import namedtuple
 from datetime import date, datetime, timedelta
+from pathlib import Path
 from subprocess import CalledProcessError
 from tempfile import TemporaryDirectory
 from typing import TYPE_CHECKING, Generator
@@ -998,6 +999,38 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
             self.run_as_task(f, venv_cache_path=tmp_dir, op_args=[4])
 
+    def test_no_side_effect_of_caching_and_termination_log(self):
+        def termination_log(a):
+            import sys
+            from pathlib import Path
+
+            assert "pytest_venv_1234" in sys.executable
+            venv_cache_dir_name = Path(sys.executable).parent.parent.name
+            raise Exception(f"Should produce termination log. Subdir = 
{venv_cache_dir_name}")
+
+        def no_termination_log(a):
+            import sys
+
+            assert "pytest_venv_1234" in sys.executable
+            raise SystemExit(1)
+
+        with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
+            with pytest.raises(AirflowException, match="Should produce 
termination log") as exc:
+                self.run_as_task(termination_log, venv_cache_path=tmp_dir, 
op_args=[4])
+            venv_dir_cache = exc.value.args[0].split(" ")[-1]
+            termination_log_path = Path(tmp_dir) / venv_dir_cache / 
"termination.log"
+            assert termination_log_path.exists()
+            assert "Should produce termination log" in 
termination_log_path.read_text()
+            clear_db_runs()
+
+            # termination log from previous run should not produce side 
effects in another task
+            # Using the same cached venv
+
+            assert termination_log_path.exists()
+            with pytest.raises(CalledProcessError):
+                self.run_as_task(no_termination_log, venv_cache_path=tmp_dir, 
op_args=[4])
+            assert not termination_log_path.exists()
+
     # This tests might take longer than default 60 seconds as it is 
serializing a lot of
     # context using dill (which is slow apparently).
     @pytest.mark.execution_timeout(120)

Reply via email to