This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c70f298ec3 Handle left-over termination logs from previous runs with
same cache (#35252)
c70f298ec3 is described below
commit c70f298ec3ae65f510ea5b48c6568b1734b58c2d
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Oct 29 14:58:14 2023 +0100
Handle left-over termination logs from previous runs with same cache
(#35252)
When PythonVirtualEnv and friends use cached venv, script and
termination logs are stored in the venv. While this was fine for scripts
because the env is locked and scripts were overwritten every time
venv started - it was not the same for termination log.
The termination log remains in venv after venv completes (which is
good for diagnostics) however when subsequent task failed without
raising regular exception but just by sys.exit(), the termination
log from previous task was found and such task would return
AirflowException instead of CalledProcessError with the message
coming from the previous task.
This PR fixes it by deleting the termination log at the start of
PythonVirtualenv task, when such termination log is present in
the venv.
---
airflow/operators/python.py | 6 +++++-
tests/operators/test_python.py | 33 +++++++++++++++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 0eadb5441a..28c3adedc7 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -431,6 +431,7 @@ class _BasePythonVirtualenvOperator(PythonOperator,
metaclass=ABCMeta):
string_args_path = tmp_dir / "string_args.txt"
script_path = tmp_dir / "script.py"
termination_log_path = tmp_dir / "termination.log"
+
self._write_args(input_path)
self._write_string_args(string_args_path)
write_python_script(
@@ -445,7 +446,10 @@ class _BasePythonVirtualenvOperator(PythonOperator,
metaclass=ABCMeta):
filename=os.fspath(script_path),
render_template_as_native_obj=self.dag.render_template_as_native_obj,
)
-
+ # For cached venv we need to make sure that the termination log does
not exist
+ # Before process starts (it could be a left-over from a previous run)
+ if termination_log_path.exists():
+ termination_log_path.unlink()
try:
execute_in_subprocess(
cmd=[
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index bfb1180353..cb2fd16a45 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -25,6 +25,7 @@ import sys
import warnings
from collections import namedtuple
from datetime import date, datetime, timedelta
+from pathlib import Path
from subprocess import CalledProcessError
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Generator
@@ -998,6 +999,38 @@ class
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
self.run_as_task(f, venv_cache_path=tmp_dir, op_args=[4])
+ def test_no_side_effect_of_caching_and_termination_log(self):
+ def termination_log(a):
+ import sys
+ from pathlib import Path
+
+ assert "pytest_venv_1234" in sys.executable
+ venv_cache_dir_name = Path(sys.executable).parent.parent.name
+ raise Exception(f"Should produce termination log. Subdir =
{venv_cache_dir_name}")
+
+ def no_termination_log(a):
+ import sys
+
+ assert "pytest_venv_1234" in sys.executable
+ raise SystemExit(1)
+
+ with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
+ with pytest.raises(AirflowException, match="Should produce
termination log") as exc:
+ self.run_as_task(termination_log, venv_cache_path=tmp_dir,
op_args=[4])
+ venv_dir_cache = exc.value.args[0].split(" ")[-1]
+ termination_log_path = Path(tmp_dir) / venv_dir_cache /
"termination.log"
+ assert termination_log_path.exists()
+ assert "Should produce termination log" in
termination_log_path.read_text()
+ clear_db_runs()
+
+ # termination log from previous run should not produce side
effects in another task
+ # Using the same cached venv
+
+ assert termination_log_path.exists()
+ with pytest.raises(CalledProcessError):
+ self.run_as_task(no_termination_log, venv_cache_path=tmp_dir,
op_args=[4])
+ assert not termination_log_path.exists()
+
# This tests might take longer than default 60 seconds as it is
serializing a lot of
# context using dill (which is slow apparently).
@pytest.mark.execution_timeout(120)