This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 10ae2b3bf56 Refactor DAG deactivation logic (#57233)
10ae2b3bf56 is described below

commit 10ae2b3bf5619d3387eacc1c1355282e16100e26
Author: Kalyan R <[email protected]>
AuthorDate: Sat Oct 25 22:39:37 2025 +0530

    Refactor DAG deactivation logic (#57233)
---
 airflow-core/src/airflow/dag_processing/manager.py |  7 +-
 airflow-core/src/airflow/models/dag.py             |  7 +-
 .../tests/unit/dag_processing/test_manager.py      | 91 ++++++++++++++++++++++
 3 files changed, 102 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 84a99a480a9..52ba1e8246b 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -636,12 +636,15 @@ class DagFileProcessorManager(LoggingMixin):
                     rel_filelocs.append(str(rel_sub_path))
 
         with create_session() as session:
-            DagModel.deactivate_deleted_dags(
+            any_deactivated = DagModel.deactivate_deleted_dags(
                 bundle_name=bundle_name,
                 rel_filelocs=rel_filelocs,
                 session=session,
             )
-            remove_references_to_deleted_dags(session=session)
+            # Only run cleanup if we actually deactivated any DAGs
+            # This avoids unnecessary DELETE queries in the common case where 
no DAGs were deleted
+            if any_deactivated:
+                remove_references_to_deleted_dags(session=session)
 
     def print_stats(self, known_files: dict[str, set[DagFileInfo]]):
         """Occasionally print out stats about how fast the files are getting 
processed."""
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 7cceac5e9ee..41123e80caa 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -553,13 +553,14 @@ class DagModel(Base):
         bundle_name: str,
         rel_filelocs: list[str],
         session: Session = NEW_SESSION,
-    ) -> None:
+    ) -> bool:
         """
         Set ``is_active=False`` on the DAGs for which the DAG files have been 
removed.
 
         :param bundle_name: bundle for filelocs
         :param rel_filelocs: relative filelocs for bundle
         :param session: ORM Session
+        :return: True if any DAGs were marked as stale, False otherwise
         """
         log.debug("Deactivating DAGs (for which DAG files are deleted) from %s 
table ", cls.__tablename__)
         dag_models = session.scalars(
@@ -575,9 +576,13 @@ class DagModel(Base):
             )
         )
 
+        any_deactivated = False
         for dm in dag_models:
             if dm.relative_fileloc not in rel_filelocs:
                 dm.is_stale = True
+                any_deactivated = True
+
+        return any_deactivated
 
     @classmethod
     def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, 
datetime]]:
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 205eaa4aaed..b7c3f23c6f4 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -743,6 +743,97 @@ class TestDagFileProcessorManager:
         # and the DAG from test_dag2.py is deactivated
         assert session.get(DagModel, "test_dag2").is_stale is True
 
+    @pytest.mark.parametrize(
+        "rel_filelocs, expected_return, expected_dag1_stale, 
expected_dag2_stale",
+        [
+            pytest.param(
+                ["test_dag1.py"],  # Only dag1 present, dag2 deleted
+                True,  # Should return True
+                False,  # dag1 should not be stale
+                True,  # dag2 should be stale
+                id="dags_deactivated",
+            ),
+            pytest.param(
+                ["test_dag1.py", "test_dag2.py"],  # Both files present
+                False,  # Should return False
+                False,  # dag1 should not be stale
+                False,  # dag2 should not be stale
+                id="no_dags_deactivated",
+            ),
+        ],
+    )
+    def test_deactivate_deleted_dags_return_value(
+        self, dag_maker, session, rel_filelocs, expected_return, 
expected_dag1_stale, expected_dag2_stale
+    ):
+        """Test that DagModel.deactivate_deleted_dags returns correct boolean 
value."""
+        with dag_maker("test_dag1") as dag1:
+            dag1.relative_fileloc = "test_dag1.py"
+        with dag_maker("test_dag2") as dag2:
+            dag2.relative_fileloc = "test_dag2.py"
+        dag_maker.sync_dagbag_to_db()
+
+        any_deactivated = DagModel.deactivate_deleted_dags(
+            bundle_name="dag_maker",
+            rel_filelocs=rel_filelocs,
+            session=session,
+        )
+
+        assert any_deactivated is expected_return
+        assert session.get(DagModel, "test_dag1").is_stale is 
expected_dag1_stale
+        assert session.get(DagModel, "test_dag2").is_stale is 
expected_dag2_stale
+
+    @pytest.mark.parametrize(
+        "active_files, should_call_cleanup",
+        [
+            pytest.param(
+                [
+                    DagFileInfo(
+                        bundle_name="dag_maker",
+                        rel_path=Path("test_dag1.py"),
+                        bundle_path=TEST_DAGS_FOLDER,
+                    ),
+                    # test_dag2.py is deleted
+                ],
+                True,  # Should call cleanup
+                id="dags_deactivated",
+            ),
+            pytest.param(
+                [
+                    DagFileInfo(
+                        bundle_name="dag_maker",
+                        rel_path=Path("test_dag1.py"),
+                        bundle_path=TEST_DAGS_FOLDER,
+                    ),
+                    DagFileInfo(
+                        bundle_name="dag_maker",
+                        rel_path=Path("test_dag2.py"),
+                        bundle_path=TEST_DAGS_FOLDER,
+                    ),
+                ],
+                False,  # Should NOT call cleanup
+                id="no_dags_deactivated",
+            ),
+        ],
+    )
+    
@mock.patch("airflow.dag_processing.manager.remove_references_to_deleted_dags")
+    def test_manager_deactivate_deleted_dags_cleanup_behavior(
+        self, mock_remove_references, dag_maker, session, active_files, 
should_call_cleanup
+    ):
+        """Test that manager conditionally calls 
remove_references_to_deleted_dags based on whether DAGs were deactivated."""
+        with dag_maker("test_dag1") as dag1:
+            dag1.relative_fileloc = "test_dag1.py"
+        with dag_maker("test_dag2") as dag2:
+            dag2.relative_fileloc = "test_dag2.py"
+        dag_maker.sync_dagbag_to_db()
+
+        manager = DagFileProcessorManager(max_runs=1)
+        manager.deactivate_deleted_dags("dag_maker", active_files)
+
+        if should_call_cleanup:
+            mock_remove_references.assert_called_once()
+        else:
+            mock_remove_references.assert_not_called()
+
     @conf_vars({("core", "load_examples"): "False"})
     def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle):
         """Test _fetch_callbacks returns callbacks ordered by priority_weight 
desc."""

Reply via email to