This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7609021ce9 Deactivate DAGs deleted from within zipfiles (#30608)
7609021ce9 is described below

commit 7609021ce93d61f2101f5e8cdc126bb8369d334b
Author: Louis Mackie <[email protected]>
AuthorDate: Thu Apr 13 05:10:17 2023 +0100

    Deactivate DAGs deleted from within zipfiles (#30608)
---
 airflow/dag_processing/manager.py       |  2 +-
 airflow/models/dag.py                   |  6 ++----
 tests/dag_processing/test_job_runner.py | 35 +++++++++++++++++++++++++++++++++
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 2cb16a3475..78493b8b55 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -791,7 +791,7 @@ class DagFileProcessorManager(LoggingMixin):
                 alive_dag_filelocs=dag_filelocs,
                 processor_subdir=self.get_dag_directory(),
             )
-            DagModel.deactivate_deleted_dags(self._file_paths)
+            DagModel.deactivate_deleted_dags(dag_filelocs)
 
             from airflow.models.dagcode import DagCode
 
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1d3c2538b8..8b259cd3e8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -99,7 +99,6 @@ from airflow.utils import timezone
 from airflow.utils.dag_cycle_tester import check_cycle
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
 from airflow.utils.decorators import fixup_decorator_warning_stack
-from airflow.utils.file import correct_maybe_zipped
 from airflow.utils.helpers import at_most_one, exactly_one, validate_key
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -3375,9 +3374,8 @@ class DagModel(Base):
 
         dag_models = session.query(cls).all()
         for dag_model in dag_models:
-            if dag_model.fileloc is not None:
-                if correct_maybe_zipped(dag_model.fileloc) not in 
alive_dag_filelocs:
-                    dag_model.is_active = False
+            if dag_model.fileloc is not None and dag_model.fileloc not in 
alive_dag_filelocs:
+                dag_model.is_active = False
             else:
                 continue
 
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index 87d321f932..4758b140aa 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -964,6 +964,41 @@ class TestDagProcessorJobRunner:
         assert SerializedDagModel.has_dag("test_zip_dag")
         # assert code not deleted
         assert DagCode.has_dag(dag.fileloc)
+        # assert dag still active
+        assert dag.get_is_active()
+
+    def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir):
+        """Test DagProcessorJobRunner._refresh_dag_dir method"""
+        manager = DagProcessorJobRunner(
+            job=Job(),
+            processor=DagFileProcessorManager(
+                dag_directory=TEST_DAG_FOLDER,
+                max_runs=1,
+                processor_timeout=timedelta(days=365),
+                signal_conn=MagicMock(),
+                dag_ids=[],
+                pickle_dags=False,
+                async_mode=True,
+            ),
+        )
+        dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+        zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
+        dagbag.process_file(zipped_dag_path)
+        dag = dagbag.get_dag("test_zip_dag")
+        dag.sync_to_db()
+        SerializedDagModel.write_dag(dag)
+        manager.processor.last_dag_dir_refresh_time = timezone.utcnow() - 
timedelta(minutes=10)
+
+        # Mock might_contain_dag to mimic deleting the python file from the zip
+        with mock.patch("airflow.dag_processing.manager.might_contain_dag", 
return_value=False):
+            manager.processor._refresh_dag_dir()
+
+        # Assert dag removed from SDM
+        assert not SerializedDagModel.has_dag("test_zip_dag")
+        # assert code deleted
+        assert not DagCode.has_dag(dag.fileloc)
+        # assert dag deactivated
+        assert not dag.get_is_active()
 
     @conf_vars(
         {

Reply via email to