This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 42b2fee69a195d758279d27c7f13914ff56c418a
Author: Jed Cunningham <[email protected]>
AuthorDate: Tue Jan 6 11:33:04 2026 -0700

    Refactor DAG file queuing and fix redundant processing (#60124)
    
    (cherry picked from commit 3cfe4b924a8a040845df2805138f250346002ab5)
---
 airflow-core/src/airflow/dag_processing/manager.py | 23 +++++++++-----
 .../tests/unit/dag_processing/test_manager.py      | 37 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index bdfd5220346..d325ede7f05 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -373,9 +373,6 @@ class DagFileProcessorManager(LoggingMixin):
                 # cleared all files added as a result of callbacks
                 self.prepare_file_queue(known_files=known_files)
                 self.emit_metrics()
-            else:
-                # if new files found in dag dir, add them
-                self.add_files_to_queue(known_files=known_files)
 
             self._start_new_processes()
 
@@ -616,6 +613,7 @@ class DagFileProcessorManager(LoggingMixin):
         if any_refreshed:
             self.handle_removed_files(known_files=known_files)
             self._resort_file_queue()
+            self._add_new_files_to_queue(known_files=known_files)
 
     def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
         """Get relative paths for dag files from bundle dir."""
@@ -962,13 +960,22 @@ class DagFileProcessorManager(LoggingMixin):
             self._processors[file] = processor
             Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-    def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):
+    def _add_new_files_to_queue(self, known_files: dict[str, 
set[DagFileInfo]]):
+        """
+        Add new files to the front of the queue.
+
+        A "new" file is a file that has not been processed yet and is not 
currently being processed.
+        """
+        new_files = []
         for files in known_files.values():
             for file in files:
-                if file not in self._file_stats:  # todo: store stats by 
bundle also?
-                    # We found new file after refreshing dir. add to parsing 
queue at start
-                    self.log.info("Adding new file %s to parsing queue", file)
-                    self._file_queue.appendleft(file)
+                # todo: store stats by bundle also?
+                if file not in self._file_stats and file not in 
self._processors:
+                    new_files.append(file)
+
+        if new_files:
+            self.log.info("Adding %d new files to the front of the queue", 
len(new_files))
+            self._add_files_to_queue(new_files, True)
 
     def _resort_file_queue(self):
         if self._file_parsing_sort_mode == "modified_time" and 
self._file_queue:
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 5bf965f30a5..922543add0c 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -337,7 +337,7 @@ class TestDagFileProcessorManager:
         manager.prepare_file_queue(
             known_files={"any": set((*dag_files, 
*_get_file_infos(["file_4-ss=1.0.py"])))}
         )
-        # manager.add_files_to_queue()
+        # manager._add_new_files_to_queue()
         ordered_files = _get_file_infos(
             [
                 "file_3-ss=4.0.py",
@@ -348,6 +348,41 @@ class TestDagFileProcessorManager:
         )
         assert manager._file_queue == deque(ordered_files)
 
+    def test_add_new_files_to_queue_behavior(self):
+        """
+        Check that _add_new_files_to_queue:
+        1. Adds new files to the front of the queue.
+        2. Skips files that are currently being processed.
+        3. Skips files that have already been processed (in _file_stats).
+        4. Does not re-add files already in the queue.
+        """
+        manager = DagFileProcessorManager(max_runs=1)
+        file_1 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER)
+        file_2 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER)
+        file_3 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER)
+        file_4 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_4.py"), bundle_path=TEST_DAGS_FOLDER)
+
+        # Setup:
+        # file_1 is already in the queue
+        manager._file_queue = deque([file_1])
+
+        # file_3 is currently being processed
+        manager._processors[file_3] = MagicMock()
+
+        # file_4 has already been processed
+        manager._file_stats[file_4] = DagFileStat(num_dags=1)
+
+        # known_files contains all four
+        known_files = {"testing": {file_1, file_2, file_3, file_4}}
+
+        manager._add_new_files_to_queue(known_files)
+
+        # file_4 should be ignored (in file_stats)
+        # file_3 should be ignored (processing)
+        # file_2 should be at the front (new)
+        # file_1 should remain (already in queue)
+        assert list(manager._file_queue) == [file_2, file_1]
+
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
     def test_resort_file_queue_by_mtime(self):

Reply via email to