This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 34ef2503e1d Add purge-warnings and bundle-refresh override seams to 
DagFileProcessorManager (#66107)
34ef2503e1d is described below

commit 34ef2503e1d698decb85971af79ad43260d04f6e
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu May 7 14:19:22 2026 +0100

    Add purge-warnings and bundle-refresh override seams to 
DagFileProcessorManager (#66107)
    
    * Add purge-warnings and bundle-refresh override seams to 
DagFileProcessorManager
    
    The projected AIP-92 DB-to-API swap needs DagFileProcessorManager subclasses
    that route operations through the API server instead of the metadata DB.
    Two spots in the current implementation are awkward to override cleanly:
    
    - _run_parsing_loop calls DagWarning.purge_inactive_dag_warnings() directly,
      forcing subclasses to reimplement the parsing loop just to redirect or
      skip the cleanup. Promote it to an overridable instance method,
      purge_inactive_dag_warnings(), so subclasses can forward it to an API or
      no-op it without touching _run_parsing_loop.
    
    - Bundle refreshes triggered by external events (API callbacks, coordinator
      messages) had to mutate the private _force_refresh_bundles set directly.
      Expose request_bundle_refresh() as the public seam so event handlers can
      mark a bundle for refresh without reaching into private state.
    
    * Make request_bundle_refresh accept multiple bundle names
    
    * Harden DAG bundle refresh seam
---
 airflow-core/src/airflow/dag_processing/manager.py | 26 ++++++++++++--
 .../tests/unit/dag_processing/test_manager.py      | 41 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 8d497ca7508..6e663da93a9 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -478,7 +478,7 @@ class DagFileProcessorManager(LoggingMixin):
                 self._add_callback_to_queue(callback)
             self._scan_stale_dags()
             self._cleanup_stale_bundle_versions()
-            DagWarning.purge_inactive_dag_warnings()
+            self.purge_inactive_dag_warnings()
 
             # Update number of loop iteration.
             self._num_run += 1
@@ -525,7 +525,7 @@ class DagFileProcessorManager(LoggingMixin):
         """Queue any files requested for parsing as requested by users via 
UI/API."""
         files = self.claim_priority_files()
         self._add_files_to_queue(files, mode="frontprio")
-        self._force_refresh_bundles |= {file.bundle_name for file in files}
+        self.request_bundle_refresh(file.bundle_name for file in files)
         if self._force_refresh_bundles:
             self.log.info("Bundles being force refreshed: %s", ", 
".join(self._force_refresh_bundles))
 
@@ -537,6 +537,19 @@ class DagFileProcessorManager(LoggingMixin):
         """
         return self._claim_priority_files()
 
+    def request_bundle_refresh(self, bundle_names: str | Iterable[str]) -> 
None:
+        """
+        Request that the given bundles be refreshed on the next refresh tick.
+
+        Use this from event handlers reacting to external signals to mark
+        bundles as needing refresh; the next call to 
:meth:`_refresh_dag_bundles`
+        will not skip them via :meth:`should_skip_refresh`.
+        """
+        if isinstance(bundle_names, str):
+            self._force_refresh_bundles.add(bundle_names)
+            return
+        self._force_refresh_bundles.update(bundle_names)
+
     def should_skip_refresh(
         self,
         *,
@@ -691,6 +704,15 @@ class DagFileProcessorManager(LoggingMixin):
             values["version"] = version
         session.execute(update(DagBundleModel).where(DagBundleModel.name == 
bundle_name).values(**values))
 
+    def purge_inactive_dag_warnings(self) -> None:
+        """
+        Purge warnings for inactive/stale DAGs.
+
+        Default implementation deletes records from the metadata DB; override 
to
+        source warnings from an API or skip the cleanup entirely.
+        """
+        DagWarning.purge_inactive_dag_warnings()
+
     def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
         """Refresh DAG bundles, if required."""
         now = timezone.utcnow()
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index c10e73473bf..2296d1b428b 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -733,6 +733,24 @@ class TestDagFileProcessorManager:
         assert manager._file_queue == deque([file1, file2])
         assert manager._force_refresh_bundles == {"dags-folder"}
 
+    def test_request_bundle_refresh_marks_bundles_for_refresh(self):
+        """`request_bundle_refresh` adds the bundles to the force-refresh 
set."""
+        manager = DagFileProcessorManager(max_runs=1)
+        assert manager._force_refresh_bundles == set()
+
+        manager.request_bundle_refresh(["bundleone", "bundletwo"])
+        manager.request_bundle_refresh(["bundleone"])  # idempotent
+
+        assert manager._force_refresh_bundles == {"bundleone", "bundletwo"}
+
+    def test_request_bundle_refresh_accepts_single_bundle_name(self):
+        """`request_bundle_refresh` treats a string as one bundle name, not an 
iterable."""
+        manager = DagFileProcessorManager(max_runs=1)
+
+        manager.request_bundle_refresh("bundleone")
+
+        assert manager._force_refresh_bundles == {"bundleone"}
+
     @pytest.mark.usefixtures("testing_dag_bundle")
     def test_scan_stale_dags(self, session):
         """
@@ -2189,6 +2207,29 @@ class TestDagFileProcessorManager:
             sync_mock.assert_not_called()
             assert [b.name for b in manager._dag_bundles] == ["testing"]
 
+    def test_purge_inactive_dag_warnings_delegates_to_dagwarning(self):
+        """Default `purge_inactive_dag_warnings` calls 
`DagWarning.purge_inactive_dag_warnings`."""
+        manager = DagFileProcessorManager(max_runs=1)
+        with mock.patch(
+            
"airflow.dag_processing.manager.DagWarning.purge_inactive_dag_warnings"
+        ) as purge_mock:
+            manager.purge_inactive_dag_warnings()
+        purge_mock.assert_called_once_with()
+
+    def test_run_parsing_loop_uses_overridable_purge(self, tmp_path, 
configure_testing_dag_bundle):
+        """`_run_parsing_loop` calls the overridable 
`purge_inactive_dag_warnings` seam."""
+        with configure_testing_dag_bundle(tmp_path):
+            manager = DagFileProcessorManager(max_runs=1)
+            with (
+                mock.patch.object(manager, "purge_inactive_dag_warnings") as 
purge_mock,
+                mock.patch(
+                    
"airflow.dag_processing.manager.DagWarning.purge_inactive_dag_warnings"
+                ) as direct_mock,
+            ):
+                manager.run()
+            purge_mock.assert_called()
+            direct_mock.assert_not_called()
+
     @mock.patch("airflow.dag_processing.manager.stats.gauge")
     def test_stats_total_parse_time(self, statsd_gauge_mock, tmp_path, 
configure_testing_dag_bundle):
         key = "dag_processing.total_parse_time"

Reply via email to