This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7609021ce9 Deactivate DAGs deleted from within zipfiles (#30608)
7609021ce9 is described below
commit 7609021ce93d61f2101f5e8cdc126bb8369d334b
Author: Louis Mackie <[email protected]>
AuthorDate: Thu Apr 13 05:10:17 2023 +0100
Deactivate DAGs deleted from within zipfiles (#30608)
---
airflow/dag_processing/manager.py | 2 +-
airflow/models/dag.py | 6 ++----
tests/dag_processing/test_job_runner.py | 35 +++++++++++++++++++++++++++++++++
3 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 2cb16a3475..78493b8b55 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -791,7 +791,7 @@ class DagFileProcessorManager(LoggingMixin):
alive_dag_filelocs=dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
- DagModel.deactivate_deleted_dags(self._file_paths)
+ DagModel.deactivate_deleted_dags(dag_filelocs)
from airflow.models.dagcode import DagCode
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1d3c2538b8..8b259cd3e8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -99,7 +99,6 @@ from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.decorators import fixup_decorator_warning_stack
-from airflow.utils.file import correct_maybe_zipped
from airflow.utils.helpers import at_most_one, exactly_one, validate_key
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
@@ -3375,9 +3374,8 @@ class DagModel(Base):
dag_models = session.query(cls).all()
for dag_model in dag_models:
- if dag_model.fileloc is not None:
- if correct_maybe_zipped(dag_model.fileloc) not in
alive_dag_filelocs:
- dag_model.is_active = False
+ if dag_model.fileloc is not None and dag_model.fileloc not in
alive_dag_filelocs:
+ dag_model.is_active = False
else:
continue
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index 87d321f932..4758b140aa 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -964,6 +964,41 @@ class TestDagProcessorJobRunner:
assert SerializedDagModel.has_dag("test_zip_dag")
# assert code not deleted
assert DagCode.has_dag(dag.fileloc)
+ # assert dag still active
+ assert dag.get_is_active()
+
+ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir):
+ """Test DagProcessorJobRunner._refresh_dag_dir method"""
+ manager = DagProcessorJobRunner(
+ job=Job(),
+ processor=DagFileProcessorManager(
+ dag_directory=TEST_DAG_FOLDER,
+ max_runs=1,
+ processor_timeout=timedelta(days=365),
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ ),
+ )
+ dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+ zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
+ dagbag.process_file(zipped_dag_path)
+ dag = dagbag.get_dag("test_zip_dag")
+ dag.sync_to_db()
+ SerializedDagModel.write_dag(dag)
+ manager.processor.last_dag_dir_refresh_time = timezone.utcnow() -
timedelta(minutes=10)
+
+ # Mock might_contain_dag to mimic deleting the python file from the zip
+ with mock.patch("airflow.dag_processing.manager.might_contain_dag",
return_value=False):
+ manager.processor._refresh_dag_dir()
+
+ # Assert dag removed from SDM
+ assert not SerializedDagModel.has_dag("test_zip_dag")
+ # assert code deleted
+ assert not DagCode.has_dag(dag.fileloc)
+ # assert dag deactivated
+ assert not dag.get_is_active()
@conf_vars(
{