This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 187a3a2af7c [v3-2-test] Mark Dags stale when their bundle is removed
from config (#66948) (#66985)
187a3a2af7c is described below
commit 187a3a2af7c456f05825c387777cb75102e4b149
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 18 15:05:15 2026 +0530
[v3-2-test] Mark Dags stale when their bundle is removed from config
(#66948) (#66985)
* Mark Dags stale when their bundle is removed from config
When a Dag bundle is removed from the bundle config, sync_bundles_to_db
flipped the bundle's active flag to False but left its Dags with
is_stale=False. The processor stops parsing files for inactive bundles,
so the time-based check in deactivate_stale_dags never fired for them.
deactivate_stale_dags now reads the set of active bundles from the
DagBundleModel table and treats any non-stale Dag whose bundle is not
active as stale, in addition to the existing last_parsed_time check for
Dags in active bundles. If the bundle reappears in config later, the
existing parse path resets is_stale to False per Dag.
* Apply suggestions from code review
---------
(cherry picked from commit 01be07a4571e58c311048246788f76f837b145fb)
Co-authored-by: Ephraim Anierobi <[email protected]>
Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Rahul Vats <[email protected]>
---
airflow-core/src/airflow/dag_processing/manager.py | 16 +++++++--
.../tests/unit/dag_processing/test_manager.py | 40 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 57ebd56c0e9..d9baaf3270d 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -372,17 +372,29 @@ class DagFileProcessorManager(LoggingMixin):
):
"""Detect and deactivate DAGs which are no longer present in files."""
to_deactivate = set()
- bundle_names = {b.name for b in self._dag_bundles}
+ inactive_bundles = set(
+
session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(False))).all()
+ )
query = select(
DagModel.dag_id,
DagModel.bundle_name,
DagModel.fileloc,
DagModel.last_parsed_time,
DagModel.relative_fileloc,
- ).where(~DagModel.is_stale, DagModel.bundle_name.in_(bundle_names))
+ ).where(~DagModel.is_stale)
dags_parsed = session.execute(query)
for dag in dags_parsed:
+ # Dags whose bundle has been removed from config (bundle no longer
active) are stale —
+ # the processor has stopped parsing their files, so the time-based
check below would never fire.
+ if dag.bundle_name in inactive_bundles:
+ self.log.info(
+ "Deactivating Dag %s. Its bundle %s is no longer active.",
+ dag.dag_id,
+ dag.bundle_name,
+ )
+ to_deactivate.add(dag.dag_id)
+ continue
# When the Dag's last_parsed_time is more than the
stale_dag_threshold older than the
# Dag file's last_finish_time, the Dag is considered stale as has
apparently been removed from the file,
# This is especially relevant for Dag files that generate Dags in
a dynamic manner.
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 15899a3c7ad..06bdc26cb45 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -778,6 +778,46 @@ class TestDagFileProcessorManager:
# SerializedDagModel gives history about Dags
assert serialized_dag_count == 1
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self,
session):
+ """Dags whose bundle is no longer active should be marked stale even
without a parse signal."""
+ session.add(DagBundleModel(name="gone-bundle"))
+ session.flush()
+ session.execute(
+ DagBundleModel.__table__.update().where(DagBundleModel.name ==
"gone-bundle").values(active=False)
+ )
+ session.add(
+ DagModel(
+ dag_id="dag_in_inactive_bundle",
+ bundle_name="gone-bundle",
+ relative_fileloc="some_file.py",
+ last_parsed_time=timezone.utcnow(),
+ is_stale=False,
+ )
+ )
+ session.add(
+ DagModel(
+ dag_id="dag_in_active_bundle",
+ bundle_name="testing",
+ relative_fileloc="other_file.py",
+ last_parsed_time=timezone.utcnow(),
+ is_stale=False,
+ )
+ )
+ session.flush()
+
+ manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 *
60)
+ manager.deactivate_stale_dags(last_parsed={})
+
+ is_stale_by_dag = dict(
+ session.execute(
+ select(DagModel.dag_id, DagModel.is_stale).where(
+ DagModel.dag_id.in_(["dag_in_inactive_bundle",
"dag_in_active_bundle"])
+ )
+ ).all()
+ )
+ assert is_stale_by_dag == {"dag_in_inactive_bundle": True,
"dag_in_active_bundle": False}
+
@mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
manager = DagFileProcessorManager(max_runs=1)