This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1622eccbfaf Dag processor: reduce file-queue dedup from O(N²) to O(N) 
with OrderedDict (#67750)
1622eccbfaf is described below

commit 1622eccbfaf4f75dd8e9f33b7b8ceac7758ae283
Author: Shahar Epstein <[email protected]>
AuthorDate: Wed Jun 3 07:24:27 2026 +0300

    Dag processor: reduce file-queue dedup from O(N²) to O(N) with OrderedDict 
(#67750)
---
 airflow-core/src/airflow/dag_processing/manager.py | 30 ++++-----
 .../tests/unit/dag_processing/test_manager.py      | 72 +++++++++++-----------
 2 files changed, 51 insertions(+), 51 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 691237cc46e..fc00b6730c8 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -19,7 +19,6 @@
 
 from __future__ import annotations
 
-import contextlib
 import functools
 import gc
 import inspect
@@ -31,7 +30,7 @@ import signal
 import sys
 import time
 import zipfile
-from collections import defaultdict, deque
+from collections import OrderedDict, defaultdict
 from dataclasses import dataclass, field
 from datetime import datetime, timedelta
 from operator import attrgetter, itemgetter
@@ -237,7 +236,7 @@ class DagFileProcessorManager(LoggingMixin):
     heartbeat: Callable[[], None] = attrs.field(default=lambda: None)
     """An overridable heartbeat called once every time around the loop"""
 
-    _file_queue: deque[DagFileInfo] = attrs.field(factory=deque, init=False)
+    _file_queue: OrderedDict[DagFileInfo, None] = 
attrs.field(factory=OrderedDict, init=False)
     _file_stats: dict[DagFileInfo, DagFileStat] = attrs.field(
         factory=lambda: defaultdict(DagFileStat), init=False
     )
@@ -1067,7 +1066,7 @@ class DagFileProcessorManager(LoggingMixin):
     def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
         """Remove from queue any files no longer observed locally."""
         present_keys = {file.presence_key for file in present}
-        self._file_queue = deque(x for x in self._file_queue if x.presence_key 
in present_keys)
+        self._file_queue = OrderedDict((x, None) for x in self._file_queue if 
x.presence_key in present_keys)
         stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
     def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
@@ -1300,7 +1299,7 @@ class DagFileProcessorManager(LoggingMixin):
     def _start_new_processes(self):
         """Start more processors if we have enough slots and files to 
process."""
         while self._parallelism > len(self._processors) and self._file_queue:
-            file = self._file_queue.popleft()
+            file, _ = self._file_queue.popitem(last=False)
             # Stop creating duplicate processor i.e. processor with the same 
filepath
             if file in self._processors:
                 continue
@@ -1347,7 +1346,7 @@ class DagFileProcessorManager(LoggingMixin):
             sorted_regular_files, _ = self._sort_by_mtime(regular_files)
 
             # Put callback files at the front, then sorted regular files
-            self._file_queue = deque(callback_files + sorted_regular_files)
+            self._file_queue = OrderedDict.fromkeys(callback_files + 
sorted_regular_files)
 
     def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
         file_stats_by_presence_key = {file.presence_key: stat for file, stat 
in self._file_stats.items()}
@@ -1512,17 +1511,18 @@ class DagFileProcessorManager(LoggingMixin):
         """Add stuff to the back or front of the file queue, unless it's 
already present."""
         if mode == "frontprio":
             for file in files:
-                # Try removing the file if already present
-                with contextlib.suppress(ValueError):
-                    self._file_queue.remove(file)
-                # enqueue file to the start of the queue.
-                self._file_queue.appendleft(file)
+                self._file_queue.pop(file, None)
+                self._file_queue[file] = None
+                self._file_queue.move_to_end(file, last=False)
         elif mode == "front":
-            new_files = list(f for f in files if f not in self._file_queue)
-            self._file_queue.extendleft(new_files)
+            for file in files:
+                if file not in self._file_queue:
+                    self._file_queue[file] = None
+                    self._file_queue.move_to_end(file, last=False)
         elif mode == "back":
-            new_files = list(f for f in files if f not in self._file_queue)
-            self._file_queue.extend(new_files)
+            for file in files:
+                if file not in self._file_queue:
+                    self._file_queue[file] = None
         else:
             assert_never(mode)
 
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 33aaa828888..941d090f229 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -28,7 +28,7 @@ import signal
 import textwrap
 import time
 import zipfile
-from collections import defaultdict, deque
+from collections import OrderedDict, defaultdict
 from datetime import datetime, timedelta
 from pathlib import Path
 from socket import socket, socketpair
@@ -372,7 +372,7 @@ class TestDagFileProcessorManager:
         file_1 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER)
         file_2 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER)
         file_3 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER)
-        manager._file_queue = deque([file_1, file_2, file_3])
+        manager._file_queue = OrderedDict.fromkeys([file_1, file_2, file_3])
 
         # Mock that only one processor exists. This processor runs with 
'file_1'
         manager._processors[file_1] = MagicMock()
@@ -388,7 +388,7 @@ class TestDagFileProcessorManager:
 
         assert file_1 in manager._processors.keys()
         assert file_2 in manager._processors.keys()
-        assert deque([file_3]) == manager._file_queue
+        assert OrderedDict.fromkeys([file_3]) == manager._file_queue
 
     def 
test_handle_removed_files_when_processor_file_path_not_in_new_file_paths(self):
         """Ensure processors and file stats are removed when the file path is 
not in the new file paths"""
@@ -438,21 +438,21 @@ class TestDagFileProcessorManager:
         versioned_file = _get_versioned_file_info("callbacks.py")
         present_file = _get_file_infos(["callbacks.py"])[0]
 
-        manager._file_queue = deque([versioned_file])
+        manager._file_queue = OrderedDict.fromkeys([versioned_file])
 
         manager.purge_removed_files_from_queue(present={present_file})
 
-        assert manager._file_queue == deque([versioned_file])
+        assert manager._file_queue == OrderedDict.fromkeys([versioned_file])
 
     def 
test_purge_removed_files_drops_versioned_callback_file_when_truly_absent(self):
         manager = DagFileProcessorManager(max_runs=1)
         versioned_file = _get_versioned_file_info("callbacks.py")
 
-        manager._file_queue = deque([versioned_file])
+        manager._file_queue = OrderedDict.fromkeys([versioned_file])
 
         manager.purge_removed_files_from_queue(present=set())
 
-        assert manager._file_queue == deque()
+        assert manager._file_queue == OrderedDict()
 
     def 
test_terminate_orphan_processes_keeps_versioned_callback_processor_when_unversioned_file_is_present(
         self,
@@ -511,9 +511,9 @@ class TestDagFileProcessorManager:
 
         manager = DagFileProcessorManager(max_runs=1)
         known_files = {"some-bundle": set(dag_files)}
-        assert manager._file_queue == deque()
+        assert manager._file_queue == OrderedDict()
         manager.prepare_file_queue(known_files=known_files)
-        assert manager._file_queue == deque(ordered_dag_files)
+        assert manager._file_queue == OrderedDict.fromkeys(ordered_dag_files)
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): 
"random_seeded_by_host"})
     def test_files_sorted_random_seeded_by_host(self):
@@ -522,10 +522,10 @@ class TestDagFileProcessorManager:
         known_files = {"anything": f_infos}
         manager = DagFileProcessorManager(max_runs=1)
 
-        assert manager._file_queue == deque()
+        assert manager._file_queue == OrderedDict()
         manager.prepare_file_queue(known_files=known_files)  # using list over 
test for reproducibility
         random.Random(get_hostname()).shuffle(f_infos)
-        expected = deque(f_infos)
+        expected = OrderedDict.fromkeys(f_infos)
         assert manager._file_queue == expected
 
         # Verify running it again produces same order
@@ -548,7 +548,7 @@ class TestDagFileProcessorManager:
 
         manager = DagFileProcessorManager(max_runs=1)
 
-        assert manager._file_queue == deque()
+        assert manager._file_queue == OrderedDict()
         manager.prepare_file_queue(known_files={"any": set(dag_files)})
         ordered_files = _get_file_infos(
             [
@@ -558,7 +558,7 @@ class TestDagFileProcessorManager:
                 "file_2-ss=2.0.py",
             ]
         )
-        assert manager._file_queue == deque(ordered_files)
+        assert manager._file_queue == OrderedDict.fromkeys(ordered_files)
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
@@ -570,7 +570,7 @@ class TestDagFileProcessorManager:
         manager = DagFileProcessorManager(max_runs=1)
         manager.prepare_file_queue(known_files={"any": set(file_infos)})
         ordered_files = _get_file_infos(["file_2-ss=3.0.py", 
"file_3-ss=2.0.py"])
-        assert manager._file_queue == deque(ordered_files)
+        assert manager._file_queue == OrderedDict.fromkeys(ordered_files)
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
@@ -597,7 +597,7 @@ class TestDagFileProcessorManager:
                 "file_4-ss=1.0.py",
             ]
         )
-        assert manager._file_queue == deque(ordered_files)
+        assert manager._file_queue == OrderedDict.fromkeys(ordered_files)
 
     def test_add_new_files_to_queue_behavior(self):
         """
@@ -615,7 +615,7 @@ class TestDagFileProcessorManager:
 
         # Setup:
         # file_1 is already in the queue
-        manager._file_queue = deque([file_1])
+        manager._file_queue = OrderedDict.fromkeys([file_1])
 
         # file_3 is currently being processed
         manager._processors[file_3] = MagicMock()
@@ -641,7 +641,7 @@ class TestDagFileProcessorManager:
         parsed_versioned_file = _get_versioned_file_info("file_4.py")
         new_file = _get_file_infos(["file_2.py"])[0]
 
-        manager._file_queue = deque([queued_versioned_file])
+        manager._file_queue = OrderedDict.fromkeys([queued_versioned_file])
         manager._processors[processed_versioned_file] = MagicMock()
         manager._file_stats[parsed_versioned_file] = DagFileStat(num_dags=1)
 
@@ -679,7 +679,7 @@ class TestDagFileProcessorManager:
 
         # Populate queue with unsorted files
         # Queue: [file_1 (100), file_2 (200)]
-        manager._file_queue = deque([dag_files[0], dag_files[1]])
+        manager._file_queue = OrderedDict.fromkeys([dag_files[0], 
dag_files[1]])
 
         manager._resort_file_queue()
 
@@ -697,7 +697,7 @@ class TestDagFileProcessorManager:
         manager = DagFileProcessorManager(max_runs=1)
 
         # Populate queue in non-alphabetical order
-        manager._file_queue = deque([file_b, file_a])
+        manager._file_queue = OrderedDict.fromkeys([file_b, file_a])
 
         manager._resort_file_queue()
 
@@ -727,7 +727,7 @@ class TestDagFileProcessorManager:
         manager = DagFileProcessorManager(max_runs=1)
 
         # Queue order: callback_1, callback_2, regular_1, regular_2
-        manager._file_queue = deque([dag_files[0], dag_files[1], dag_files[2], 
dag_files[3]])
+        manager._file_queue = OrderedDict.fromkeys([dag_files[0], 
dag_files[1], dag_files[2], dag_files[3]])
 
         # Both callback files have pending callbacks
         manager._callback_to_execute[dag_files[0]] = [MagicMock()]
@@ -762,22 +762,22 @@ class TestDagFileProcessorManager:
             dag_file: DagFileStat(1, 0, last_finish_time, 1.0, 1, 1),
         }
         with time_machine.travel(freezed_base_time):
-            assert manager._file_queue == deque()
+            assert manager._file_queue == OrderedDict()
             # File Path Queue will be empty as the "modified time" < "last 
finish time"
             manager.prepare_file_queue(known_files=known_files)
-            assert manager._file_queue == deque()
+            assert manager._file_queue == OrderedDict()
 
         # Simulate the DAG modification by using modified_time which is greater
         # than the last_parse_time but still less than now - 
min_file_process_interval
         file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
         file_1_new_mtime_ts = file_1_new_mtime.timestamp()
         with time_machine.travel(freezed_base_time):
-            assert manager._file_queue == deque()
+            assert manager._file_queue == OrderedDict()
             # File Path Queue will be empty as the "modified time" < "last 
finish time"
             mock_getmtime.side_effect = [file_1_new_mtime_ts]
             manager.prepare_file_queue(known_files=known_files)
             # Check that file is added to the queue even though file was just 
recently passed
-            assert manager._file_queue == deque([dag_file])
+            assert manager._file_queue == OrderedDict.fromkeys([dag_file])
             assert last_finish_time < file_1_new_mtime
             assert (
                 manager._file_process_interval
@@ -794,7 +794,7 @@ class TestDagFileProcessorManager:
 
         manager.prepare_file_queue(known_files={"testing": {known_file}})
 
-        assert manager._file_queue == deque()
+        assert manager._file_queue == OrderedDict()
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
     def 
test_prepare_file_queue_skips_file_when_versioned_stat_is_at_run_limit(self):
@@ -806,7 +806,7 @@ class TestDagFileProcessorManager:
 
         manager.prepare_file_queue(known_files={"testing": {known_file}})
 
-        assert manager._file_queue == deque()
+        assert manager._file_queue == OrderedDict()
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
     def 
test_prepare_file_queue_skips_recently_processed_file_with_versioned_stats(self):
@@ -821,7 +821,7 @@ class TestDagFileProcessorManager:
 
         manager.prepare_file_queue(known_files={"testing": {known_file}})
 
-        assert manager._file_queue == deque()
+        assert manager._file_queue == OrderedDict()
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("airflow.utils.file.os.path.getmtime")
@@ -843,7 +843,7 @@ class TestDagFileProcessorManager:
             mock_getmtime.side_effect = [(freezed_base_time - 
timedelta(seconds=5)).timestamp()]
             manager.prepare_file_queue(known_files=known_files)
 
-        assert manager._file_queue == deque([known_file])
+        assert manager._file_queue == OrderedDict.fromkeys([known_file])
         assert known_file not in manager._file_stats
         assert versioned_file in manager._file_stats
 
@@ -864,9 +864,9 @@ class TestDagFileProcessorManager:
 
         manager = DagFileProcessorManager(max_runs=1)
         manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
-        manager._file_queue = deque([file2, file1])
+        manager._file_queue = OrderedDict.fromkeys([file2, file1])
         manager._queue_requested_files_for_parsing()
-        assert manager._file_queue == deque([file1, file2])
+        assert manager._file_queue == OrderedDict.fromkeys([file1, file2])
         assert manager._force_refresh_bundles == {"dags-folder"}
         with create_session() as session2:
             parsing_request_after = session2.get(DagPriorityParsingRequest, 
parsing_request.id)
@@ -888,7 +888,7 @@ class TestDagFileProcessorManager:
         manager = DagFileProcessorManager(max_runs=1)
         manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
         manager._queue_requested_files_for_parsing()
-        assert manager._file_queue == deque([file1])
+        assert manager._file_queue == OrderedDict.fromkeys([file1])
         with create_session() as session2:
             parsing_request_after = 
session2.scalars(select(DagPriorityParsingRequest)).all()
         assert len(parsing_request_after) == 1
@@ -907,11 +907,11 @@ class TestDagFileProcessorManager:
                 return [file1]
 
         manager = ApiBackedManager(max_runs=1)
-        manager._file_queue = deque([file2])
+        manager._file_queue = OrderedDict.fromkeys([file2])
 
         manager._queue_requested_files_for_parsing()
 
-        assert manager._file_queue == deque([file1, file2])
+        assert manager._file_queue == OrderedDict.fromkeys([file1, file2])
         assert manager._force_refresh_bundles == {"dags-folder"}
 
     def test_request_bundle_refresh_marks_bundles_for_refresh(self):
@@ -1758,7 +1758,7 @@ class TestDagFileProcessorManager:
             manager._add_callback_to_queue(dag2_req1)
 
             # then - requests should be in manager's queue, with dag2 ahead of 
dag1 (because it was added last)
-            assert manager._file_queue == deque([dag2_path, dag1_path])
+            assert manager._file_queue == OrderedDict.fromkeys([dag2_path, 
dag1_path])
             assert set(manager._callback_to_execute.keys()) == {
                 dag1_path,
                 dag2_path,
@@ -1766,12 +1766,12 @@ class TestDagFileProcessorManager:
             assert manager._callback_to_execute[dag2_path] == [dag2_req1]
 
             # update the queue, although the callback is registered
-            assert manager._file_queue == deque([dag2_path, dag1_path])
+            assert manager._file_queue == OrderedDict.fromkeys([dag2_path, 
dag1_path])
 
             # when
             manager._add_callback_to_queue(dag1_req2)
             # Since dag1_req2 is same as dag1_req1, we now have 2 items in 
file_path_queue
-            assert manager._file_queue == deque([dag2_path, dag1_path])
+            assert manager._file_queue == OrderedDict.fromkeys([dag2_path, 
dag1_path])
             assert manager._callback_to_execute[dag1_path] == [
                 dag1_req1,
                 dag1_req2,

Reply via email to