This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 187a3a2af7c [v3-2-test] Mark Dags stale when their bundle is removed 
from config (#66948) (#66985)
187a3a2af7c is described below

commit 187a3a2af7c456f05825c387777cb75102e4b149
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 18 15:05:15 2026 +0530

    [v3-2-test] Mark Dags stale when their bundle is removed from config 
(#66948) (#66985)
    
    * Mark Dags stale when their bundle is removed from config
    
    When a Dag bundle is removed from the bundle config, sync_bundles_to_db
    flipped the bundle's active flag to False but left its Dags with
    is_stale=False. The processor stops parsing files for inactive bundles,
    so the time-based check in deactivate_stale_dags never fired for them.
    
    deactivate_stale_dags now reads the set of active bundles from the
    DagBundleModel table and treats any non-stale Dag whose bundle is not
    active as stale, in addition to the existing last_parsed_time check for
    Dags in active bundles. If the bundle reappears in config later, the
    existing parse path resets is_stale to False per Dag.
    
    * Apply suggestions from code review
    
    
    
    ---------
    (cherry picked from commit 01be07a4571e58c311048246788f76f837b145fb)
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
    Co-authored-by: Wei Lee <[email protected]>
    Co-authored-by: Rahul Vats <[email protected]>
---
 airflow-core/src/airflow/dag_processing/manager.py | 16 +++++++--
 .../tests/unit/dag_processing/test_manager.py      | 40 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 57ebd56c0e9..d9baaf3270d 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -372,17 +372,29 @@ class DagFileProcessorManager(LoggingMixin):
     ):
         """Detect and deactivate DAGs which are no longer present in files."""
         to_deactivate = set()
-        bundle_names = {b.name for b in self._dag_bundles}
+        inactive_bundles = set(
+            
session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(False))).all()
+        )
         query = select(
             DagModel.dag_id,
             DagModel.bundle_name,
             DagModel.fileloc,
             DagModel.last_parsed_time,
             DagModel.relative_fileloc,
-        ).where(~DagModel.is_stale, DagModel.bundle_name.in_(bundle_names))
+        ).where(~DagModel.is_stale)
         dags_parsed = session.execute(query)
 
         for dag in dags_parsed:
+            # Dags whose bundle has been removed from config (bundle no longer 
active) are stale —
+            # the processor has stopped parsing their files, so the time-based 
check below would never fire.
+            if dag.bundle_name in inactive_bundles:
+                self.log.info(
+                    "Deactivating Dag %s. Its bundle %s is no longer active.",
+                    dag.dag_id,
+                    dag.bundle_name,
+                )
+                to_deactivate.add(dag.dag_id)
+                continue
             # When the Dag's last_parsed_time is more than the 
stale_dag_threshold older than the
             # Dag file's last_finish_time, the Dag is considered stale as has 
apparently been removed from the file,
             # This is especially relevant for Dag files that generate Dags in 
a dynamic manner.
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 15899a3c7ad..06bdc26cb45 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -778,6 +778,46 @@ class TestDagFileProcessorManager:
         # SerializedDagModel gives history about Dags
         assert serialized_dag_count == 1
 
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self, 
session):
+        """Dags whose bundle is no longer active should be marked stale even 
without a parse signal."""
+        session.add(DagBundleModel(name="gone-bundle"))
+        session.flush()
+        session.execute(
+            DagBundleModel.__table__.update().where(DagBundleModel.name == 
"gone-bundle").values(active=False)
+        )
+        session.add(
+            DagModel(
+                dag_id="dag_in_inactive_bundle",
+                bundle_name="gone-bundle",
+                relative_fileloc="some_file.py",
+                last_parsed_time=timezone.utcnow(),
+                is_stale=False,
+            )
+        )
+        session.add(
+            DagModel(
+                dag_id="dag_in_active_bundle",
+                bundle_name="testing",
+                relative_fileloc="other_file.py",
+                last_parsed_time=timezone.utcnow(),
+                is_stale=False,
+            )
+        )
+        session.flush()
+
+        manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 
60)
+        manager.deactivate_stale_dags(last_parsed={})
+
+        is_stale_by_dag = dict(
+            session.execute(
+                select(DagModel.dag_id, DagModel.is_stale).where(
+                    DagModel.dag_id.in_(["dag_in_inactive_bundle", 
"dag_in_active_bundle"])
+                )
+            ).all()
+        )
+        assert is_stale_by_dag == {"dag_in_inactive_bundle": True, 
"dag_in_active_bundle": False}
+
     @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
     def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
         manager = DagFileProcessorManager(max_runs=1)

Reply via email to