This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 749e53def4 remove stale serialized dags (#22917)
749e53def4 is described below

commit 749e53def43055225a2e5d09596af7821d91b4ac
Author: Ping Zhang <[email protected]>
AuthorDate: Thu May 12 12:01:47 2022 -0700

    remove stale serialized dags (#22917)
---
 airflow/dag_processing/manager.py    | 10 +++++++++-
 tests/dag_processing/test_manager.py | 32 ++++++++++++++++++++++++++------
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 4cb11cff82..f16a913725 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -481,7 +481,11 @@ class DagFileProcessorManager(LoggingMixin):
 
     @provide_session
     def _deactivate_stale_dags(self, session=None):
-        """Detects DAGs which are no longer present in files and deactivate 
them."""
+        """
+        Detects DAGs which are no longer present in files
+
+        Deactivate them and remove them in the serialized_dag table
+        """
         now = timezone.utcnow()
         elapsed_time_since_refresh = (now - 
self.last_deactivate_stale_dags_time).total_seconds()
         if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
@@ -514,6 +518,10 @@ class DagFileProcessorManager(LoggingMixin):
                 if deactivated:
                     self.log.info("Deactivated %i DAGs which are no longer 
present in file.", deactivated)
 
+                for dag_id in to_deactivate:
+                    SerializedDagModel.remove_dag(dag_id)
+                    self.log.info("Deleted DAG %s in serialized_dag table", 
dag_id)
+
             self.last_deactivate_stale_dags_time = timezone.utcnow()
 
     def _run_parsing_loop(self):
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 6a65116d51..ad613d877d 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -34,6 +34,7 @@ from unittest.mock import MagicMock, PropertyMock
 
 import pytest
 from freezegun import freeze_time
+from sqlalchemy import func
 
 from airflow.callbacks.callback_requests import CallbackRequest, 
DagCallbackRequest, SlaCallbackRequest
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
@@ -476,6 +477,7 @@ class TestDagFileProcessorManager:
             dag = dagbag.get_dag('test_example_bash_operator')
             dag.last_parsed_time = timezone.utcnow()
             dag.sync_to_db()
+            SerializedDagModel.write_dag(dag)
 
             # Add DAG to the file_parsing_stats
             stat = DagFileStat(
@@ -488,18 +490,36 @@ class TestDagFileProcessorManager:
             manager._file_paths = [test_dag_path]
             manager._file_stats[test_dag_path] = stat
 
-            active_dags = (
-                session.query(DagModel).filter(DagModel.is_active, 
DagModel.fileloc == test_dag_path).all()
+            active_dag_count = (
+                session.query(func.count(DagModel.dag_id))
+                .filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
+                .scalar()
             )
-            assert len(active_dags) == 1
+            assert active_dag_count == 1
+
+            serialized_dag_count = (
+                session.query(func.count(SerializedDagModel.dag_id))
+                .filter(SerializedDagModel.fileloc == test_dag_path)
+                .scalar()
+            )
+            assert serialized_dag_count == 1
 
             manager._file_stats[test_dag_path] = stat
             manager._deactivate_stale_dags()
-            active_dags = (
-                session.query(DagModel).filter(DagModel.is_active, 
DagModel.fileloc == test_dag_path).all()
+
+            active_dag_count = (
+                session.query(func.count(DagModel.dag_id))
+                .filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
+                .scalar()
             )
+            assert active_dag_count == 0
 
-            assert len(active_dags) == 0
+            serialized_dag_count = (
+                session.query(func.count(SerializedDagModel.dag_id))
+                .filter(SerializedDagModel.fileloc == test_dag_path)
+                .scalar()
+            )
+            assert serialized_dag_count == 0
 
     @mock.patch(
         
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", 
new_callable=PropertyMock

Reply via email to