This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dc02968418d Only grab DAGs from bundles being parsed in stale DAG 
cleanup (#47519)
dc02968418d is described below

commit dc02968418d40d5279a7479429cf7031c50a073d
Author: Jed Cunningham <[email protected]>
AuthorDate: Sat Mar 8 10:16:43 2025 -0700

    Only grab DAGs from bundles being parsed in stale DAG cleanup (#47519)
    
    Instead of bringing back every DAG, let's only select the DAGs that are
    in the bundles that the DAG processor is parsing. We won't deactivate
    others anyways, so it's just extra work for the db and DAG processor.
---
 airflow/dag_processing/manager.py    | 4 ++--
 tests/dag_processing/test_manager.py | 3 +++
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 8578ff40a83..13e77795060 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -272,14 +272,14 @@ class DagFileProcessorManager(LoggingMixin):
     ):
         """Detect and deactivate DAGs which are no longer present in files."""
         to_deactivate = set()
+        bundle_names = {b.name for b in self._dag_bundles}
         query = select(
             DagModel.dag_id,
             DagModel.bundle_name,
             DagModel.fileloc,
             DagModel.last_parsed_time,
             DagModel.relative_fileloc,
-        ).where(DagModel.is_active)
-        # TODO: AIP-66 by bundle!
+        ).where(DagModel.is_active, DagModel.bundle_name.in_(bundle_names))
         dags_parsed = session.execute(query)
 
         for dag in dags_parsed:
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 4e605e02da3..12ccf60b113 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -420,6 +420,9 @@ class TestDagFileProcessorManager:
             max_runs=1,
             processor_timeout=10 * 60,
         )
+        bundle = MagicMock()
+        bundle.name = "testing"
+        manager._dag_bundles = [bundle]
 
         test_dag_path = DagFileInfo(
             bundle_name="testing",

Reply via email to