This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dc02968418d Only grab DAGs from bundles being parsed in stale DAG
cleanup (#47519)
dc02968418d is described below
commit dc02968418d40d5279a7479429cf7031c50a073d
Author: Jed Cunningham <[email protected]>
AuthorDate: Sat Mar 8 10:16:43 2025 -0700
Only grab DAGs from bundles being parsed in stale DAG cleanup (#47519)
Instead of bringing back every DAG, let's only select the DAGs that are
in the bundles that the DAG processor is parsing. We won't deactivate
others anyways, so it's just extra work for the db and DAG processor.
---
airflow/dag_processing/manager.py | 4 ++--
tests/dag_processing/test_manager.py | 3 +++
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 8578ff40a83..13e77795060 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -272,14 +272,14 @@ class DagFileProcessorManager(LoggingMixin):
):
"""Detect and deactivate DAGs which are no longer present in files."""
to_deactivate = set()
+ bundle_names = {b.name for b in self._dag_bundles}
query = select(
DagModel.dag_id,
DagModel.bundle_name,
DagModel.fileloc,
DagModel.last_parsed_time,
DagModel.relative_fileloc,
- ).where(DagModel.is_active)
- # TODO: AIP-66 by bundle!
+ ).where(DagModel.is_active, DagModel.bundle_name.in_(bundle_names))
dags_parsed = session.execute(query)
for dag in dags_parsed:
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index 4e605e02da3..12ccf60b113 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -420,6 +420,9 @@ class TestDagFileProcessorManager:
max_runs=1,
processor_timeout=10 * 60,
)
+ bundle = MagicMock()
+ bundle.name = "testing"
+ manager._dag_bundles = [bundle]
test_dag_path = DagFileInfo(
bundle_name="testing",