This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 83e4b5ffcdd85f8bdfc4ffd18d4f95a086aa4dd1
Author: Jed Cunningham <[email protected]>
AuthorDate: Fri Jan 2 00:51:45 2026 -0700

    Optimized DAG processing queue order on bundle refresh (#60003)
    
    When file_parsing_sort_mode is set to modified_time, the DAG file queue
    is now resorted by modification time whenever a bundle refresh happens.
    This ensures that existing files in the queue are prioritized correctly
    based on their recency, rather than retaining their old order. New files
    discovered during the refresh are still added to the head of the queue
    for immediate processing.
    
    (cherry picked from commit bbf234edb613610e4215fa190b8a70a8f23cd1ae)
---
 airflow-core/src/airflow/dag_processing/manager.py | 24 ++++++++---
 .../tests/unit/dag_processing/test_manager.py      | 46 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index fed39d77675..bdfd5220346 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -225,6 +225,10 @@ class DagFileProcessorManager(LoggingMixin):
     _force_refresh_bundles: set[str] = attrs.field(factory=set, init=False)
     """List of bundles that need to be force refreshed in the next loop"""
 
+    _file_parsing_sort_mode: str = attrs.field(
+        factory=_config_get_factory("dag_processor", "file_parsing_sort_mode")
+    )
+
     _api_server: InProcessExecutionAPI = attrs.field(init=False, 
factory=InProcessExecutionAPI)
     """API server to interact with Metadata DB"""
 
@@ -523,12 +527,14 @@ class DagFileProcessorManager(LoggingMixin):
 
         self._bundles_last_refreshed = now_seconds
 
+        any_refreshed = False
         for bundle in self._dag_bundles:
             # TODO: AIP-66 handle errors in the case of incomplete cloning? 
And test this.
             #  What if the cloning/refreshing took too long(longer than the 
dag processor timeout)
             if not bundle.is_initialized:
                 try:
                     bundle.initialize()
+                    any_refreshed = True
                 except AirflowException as e:
                     self.log.exception("Error initializing bundle %s: %s", 
bundle.name, e)
                     continue
@@ -563,6 +569,7 @@ class DagFileProcessorManager(LoggingMixin):
 
                 try:
                     bundle.refresh()
+                    any_refreshed = True
                 except Exception:
                     self.log.exception("Error refreshing bundle %s", 
bundle.name)
                     continue
@@ -599,7 +606,6 @@ class DagFileProcessorManager(LoggingMixin):
             }
 
             known_files[bundle.name] = found_files
-            self.handle_removed_files(known_files=known_files)
 
             self.deactivate_deleted_dags(bundle_name=bundle.name, 
present=found_files)
             self.clear_orphaned_import_errors(
@@ -607,6 +613,10 @@ class DagFileProcessorManager(LoggingMixin):
                 observed_filelocs={str(x.rel_path) for x in found_files},  # 
todo: make relative
             )
 
+        if any_refreshed:
+            self.handle_removed_files(known_files=known_files)
+            self._resort_file_queue()
+
     def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
         """Get relative paths for dag files from bundle dir."""
         # Build up a list of Python files that could contain DAGs
@@ -960,6 +970,11 @@ class DagFileProcessorManager(LoggingMixin):
                     self.log.info("Adding new file %s to parsing queue", file)
                     self._file_queue.appendleft(file)
 
+    def _resort_file_queue(self):
+        if self._file_parsing_sort_mode == "modified_time" and 
self._file_queue:
+            files, _ = self._sort_by_mtime(self._file_queue)
+            self._file_queue = deque(files)
+
     def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
         files_with_mtime: dict[DagFileInfo, float] = {}
         changed_recently = set()
@@ -1002,7 +1017,6 @@ class DagFileProcessorManager(LoggingMixin):
         now = timezone.utcnow()
 
         # Sort the file paths by the parsing order mode
-        list_mode = conf.get("dag_processor", "file_parsing_sort_mode")
         recently_processed = set()
         files = []
 
@@ -1013,11 +1027,11 @@ class DagFileProcessorManager(LoggingMixin):
                     recently_processed.add(file)
 
         changed_recently: set[DagFileInfo] = set()
-        if list_mode == "modified_time":
+        if self._file_parsing_sort_mode == "modified_time":
             files, changed_recently = self._sort_by_mtime(files=files)
-        elif list_mode == "alphabetical":
+        elif self._file_parsing_sort_mode == "alphabetical":
             files.sort(key=attrgetter("rel_path"))
-        elif list_mode == "random_seeded_by_host":
+        elif self._file_parsing_sort_mode == "random_seeded_by_host":
             # Shuffle the list seeded by hostname so multiple DAG processors 
can work on different
             # set of files. Since we set the seed, the sort order will remain 
same per host
             random.Random(get_hostname()).shuffle(files)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 456d925935b..5bf965f30a5 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -348,6 +348,52 @@ class TestDagFileProcessorManager:
         )
         assert manager._file_queue == deque(ordered_files)
 
+    @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
+    @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
+    def test_resort_file_queue_by_mtime(self):
+        """
+        Check that existing files in the queue are re-sorted by mtime when 
calling _resort_file_queue,
+        if sort mode is modified_time.
+        """
+        # Prepare some files with mtimes
+        files_with_mtime = [
+            ("file_1.py", 100.0),
+            ("file_2.py", 200.0),
+        ]
+        filenames = encode_mtime_in_filename(files_with_mtime)
+        dag_files = _get_file_infos(filenames)
+        # dag_files[0] -> file_1 (mtime 100)
+        # dag_files[1] -> file_2 (mtime 200)
+
+        manager = DagFileProcessorManager(max_runs=1)
+
+        # Populate queue with unsorted files
+        # Queue: [file_1 (100), file_2 (200)]
+        manager._file_queue = deque([dag_files[0], dag_files[1]])
+
+        manager._resort_file_queue()
+
+        # Verify resort happened: [file_2 (200), file_1 (100)]
+        assert list(manager._file_queue) == [dag_files[1], dag_files[0]]
+
+    @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
+    def test_resort_file_queue_does_nothing_when_alphabetical(self):
+        """
+        Check that _resort_file_queue does NOT change the order if sort mode 
is alphabetical.
+        """
+        file_a = DagFileInfo(bundle_name="testing", rel_path=Path("a.py"), 
bundle_path=TEST_DAGS_FOLDER)
+        file_b = DagFileInfo(bundle_name="testing", rel_path=Path("b.py"), 
bundle_path=TEST_DAGS_FOLDER)
+
+        manager = DagFileProcessorManager(max_runs=1)
+
+        # Populate queue in non-alphabetical order
+        manager._file_queue = deque([file_b, file_a])
+
+        manager._resort_file_queue()
+
+        # Order should remain unchanged
+        assert list(manager._file_queue) == [file_b, file_a]
+
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("airflow.utils.file.os.path.getmtime")
     def test_recently_modified_file_is_parsed_with_mtime_mode(self, 
mock_getmtime):

Reply via email to