This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 42b2fee69a195d758279d27c7f13914ff56c418a Author: Jed Cunningham <[email protected]> AuthorDate: Tue Jan 6 11:33:04 2026 -0700 Refactor DAG file queuing and fix redundant processing (#60124) (cherry picked from commit 3cfe4b924a8a040845df2805138f250346002ab5) --- airflow-core/src/airflow/dag_processing/manager.py | 23 +++++++++----- .../tests/unit/dag_processing/test_manager.py | 37 +++++++++++++++++++++- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index bdfd5220346..d325ede7f05 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -373,9 +373,6 @@ class DagFileProcessorManager(LoggingMixin): # cleared all files added as a result of callbacks self.prepare_file_queue(known_files=known_files) self.emit_metrics() - else: - # if new files found in dag dir, add them - self.add_files_to_queue(known_files=known_files) self._start_new_processes() @@ -616,6 +613,7 @@ class DagFileProcessorManager(LoggingMixin): if any_refreshed: self.handle_removed_files(known_files=known_files) self._resort_file_queue() + self._add_new_files_to_queue(known_files=known_files) def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: """Get relative paths for dag files from bundle dir.""" @@ -962,13 +960,22 @@ class DagFileProcessorManager(LoggingMixin): self._processors[file] = processor Stats.gauge("dag_processing.file_path_queue_size", len(self._file_queue)) - def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]): + def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]): + """ + Add new files to the front of the queue. + + A "new" file is a file that has not been processed yet and is not currently being processed. + """ + new_files = [] for files in known_files.values(): for file in files: - if file not in self._file_stats: # todo: store stats by bundle also? - # We found new file after refreshing dir. add to parsing queue at start - self.log.info("Adding new file %s to parsing queue", file) - self._file_queue.appendleft(file) + # todo: store stats by bundle also? + if file not in self._file_stats and file not in self._processors: + new_files.append(file) + + if new_files: + self.log.info("Adding %d new files to the front of the queue", len(new_files)) + self._add_files_to_queue(new_files, True) def _resort_file_queue(self): if self._file_parsing_sort_mode == "modified_time" and self._file_queue: diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 5bf965f30a5..922543add0c 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -337,7 +337,7 @@ class TestDagFileProcessorManager: manager.prepare_file_queue( known_files={"any": set((*dag_files, *_get_file_infos(["file_4-ss=1.0.py"])))} ) - # manager.add_files_to_queue() + # manager._add_new_files_to_queue() ordered_files = _get_file_infos( [ "file_3-ss=4.0.py", @@ -348,6 +348,41 @@ class TestDagFileProcessorManager: ) assert manager._file_queue == deque(ordered_files) + def test_add_new_files_to_queue_behavior(self): + """ + Check that _add_new_files_to_queue: + 1. Adds new files to the front of the queue. + 2. Skips files that are currently being processed. + 3. Skips files that have already been processed (in _file_stats). + 4. Does not re-add files already in the queue. + """ + manager = DagFileProcessorManager(max_runs=1) + file_1 = DagFileInfo(bundle_name="testing", rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER) + file_2 = DagFileInfo(bundle_name="testing", rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER) + file_3 = DagFileInfo(bundle_name="testing", rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER) + file_4 = DagFileInfo(bundle_name="testing", rel_path=Path("file_4.py"), bundle_path=TEST_DAGS_FOLDER) + + # Setup: + # file_1 is already in the queue + manager._file_queue = deque([file_1]) + + # file_3 is currently being processed + manager._processors[file_3] = MagicMock() + + # file_4 has already been processed + manager._file_stats[file_4] = DagFileStat(num_dags=1) + + # known_files contains all four + known_files = {"testing": {file_1, file_2, file_3, file_4}} + + manager._add_new_files_to_queue(known_files) + + # file_4 should be ignored (in file_stats) + # file_3 should be ignored (processing) + # file_2 should be at the front (new) + # file_1 should remain (already in queue) + assert list(manager._file_queue) == [file_2, file_1] + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime) def test_resort_file_queue_by_mtime(self):
