This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 83e4b5ffcdd85f8bdfc4ffd18d4f95a086aa4dd1 Author: Jed Cunningham <[email protected]> AuthorDate: Fri Jan 2 00:51:45 2026 -0700 Optimized DAG processing queue order on bundle refresh (#60003) When file_parsing_sort_mode is set to modified_time, the DAG file queue is now resorted by modification time whenever a bundle refresh happens. This ensures that existing files in the queue are prioritized correctly based on their recency, rather than retaining their old order. New files discovered during the refresh are still added to the head of the queue for immediate processing. (cherry picked from commit bbf234edb613610e4215fa190b8a70a8f23cd1ae) --- airflow-core/src/airflow/dag_processing/manager.py | 24 ++++++++--- .../tests/unit/dag_processing/test_manager.py | 46 ++++++++++++++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index fed39d77675..bdfd5220346 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -225,6 +225,10 @@ class DagFileProcessorManager(LoggingMixin): _force_refresh_bundles: set[str] = attrs.field(factory=set, init=False) """List of bundles that need to be force refreshed in the next loop""" + _file_parsing_sort_mode: str = attrs.field( + factory=_config_get_factory("dag_processor", "file_parsing_sort_mode") + ) + _api_server: InProcessExecutionAPI = attrs.field(init=False, factory=InProcessExecutionAPI) """API server to interact with Metadata DB""" @@ -523,12 +527,14 @@ class DagFileProcessorManager(LoggingMixin): self._bundles_last_refreshed = now_seconds + any_refreshed = False for bundle in self._dag_bundles: # TODO: AIP-66 handle errors in the case of incomplete cloning? And test this. # What if the cloning/refreshing took too long(longer than the dag processor timeout) if not bundle.is_initialized: try: bundle.initialize() + any_refreshed = True except AirflowException as e: self.log.exception("Error initializing bundle %s: %s", bundle.name, e) continue @@ -563,6 +569,7 @@ class DagFileProcessorManager(LoggingMixin): try: bundle.refresh() + any_refreshed = True except Exception: self.log.exception("Error refreshing bundle %s", bundle.name) continue @@ -599,7 +606,6 @@ class DagFileProcessorManager(LoggingMixin): } known_files[bundle.name] = found_files - self.handle_removed_files(known_files=known_files) self.deactivate_deleted_dags(bundle_name=bundle.name, present=found_files) self.clear_orphaned_import_errors( @@ -607,6 +613,10 @@ class DagFileProcessorManager(LoggingMixin): observed_filelocs={str(x.rel_path) for x in found_files}, # todo: make relative ) + if any_refreshed: + self.handle_removed_files(known_files=known_files) + self._resort_file_queue() + def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: """Get relative paths for dag files from bundle dir.""" # Build up a list of Python files that could contain DAGs @@ -960,6 +970,11 @@ class DagFileProcessorManager(LoggingMixin): self.log.info("Adding new file %s to parsing queue", file) self._file_queue.appendleft(file) + def _resort_file_queue(self): + if self._file_parsing_sort_mode == "modified_time" and self._file_queue: + files, _ = self._sort_by_mtime(self._file_queue) + self._file_queue = deque(files) + def _sort_by_mtime(self, files: Iterable[DagFileInfo]): files_with_mtime: dict[DagFileInfo, float] = {} changed_recently = set() @@ -1002,7 +1017,6 @@ class DagFileProcessorManager(LoggingMixin): now = timezone.utcnow() # Sort the file paths by the parsing order mode - list_mode = conf.get("dag_processor", "file_parsing_sort_mode") recently_processed = set() files = [] @@ -1013,11 +1027,11 @@ class DagFileProcessorManager(LoggingMixin): recently_processed.add(file) changed_recently: set[DagFileInfo] = set() - if list_mode == "modified_time": + if self._file_parsing_sort_mode == "modified_time": files, changed_recently = self._sort_by_mtime(files=files) - elif list_mode == "alphabetical": + elif self._file_parsing_sort_mode == "alphabetical": files.sort(key=attrgetter("rel_path")) - elif list_mode == "random_seeded_by_host": + elif self._file_parsing_sort_mode == "random_seeded_by_host": # Shuffle the list seeded by hostname so multiple DAG processors can work on different # set of files. Since we set the seed, the sort order will remain same per host random.Random(get_hostname()).shuffle(files) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 456d925935b..5bf965f30a5 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -348,6 +348,52 @@ class TestDagFileProcessorManager: ) assert manager._file_queue == deque(ordered_files) + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) + @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime) + def test_resort_file_queue_by_mtime(self): + """ + Check that existing files in the queue are re-sorted by mtime when calling _resort_file_queue, + if sort mode is modified_time. + """ + # Prepare some files with mtimes + files_with_mtime = [ + ("file_1.py", 100.0), + ("file_2.py", 200.0), + ] + filenames = encode_mtime_in_filename(files_with_mtime) + dag_files = _get_file_infos(filenames) + # dag_files[0] -> file_1 (mtime 100) + # dag_files[1] -> file_2 (mtime 200) + + manager = DagFileProcessorManager(max_runs=1) + + # Populate queue with unsorted files + # Queue: [file_1 (100), file_2 (200)] + manager._file_queue = deque([dag_files[0], dag_files[1]]) + + manager._resort_file_queue() + + # Verify resort happened: [file_2 (200), file_1 (100)] + assert list(manager._file_queue) == [dag_files[1], dag_files[0]] + + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"}) + def test_resort_file_queue_does_nothing_when_alphabetical(self): + """ + Check that _resort_file_queue does NOT change the order if sort mode is alphabetical. + """ + file_a = DagFileInfo(bundle_name="testing", rel_path=Path("a.py"), bundle_path=TEST_DAGS_FOLDER) + file_b = DagFileInfo(bundle_name="testing", rel_path=Path("b.py"), bundle_path=TEST_DAGS_FOLDER) + + manager = DagFileProcessorManager(max_runs=1) + + # Populate queue in non-alphabetical order + manager._file_queue = deque([file_b, file_a]) + + manager._resort_file_queue() + + # Order should remain unchanged + assert list(manager._file_queue) == [file_b, file_a] + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("airflow.utils.file.os.path.getmtime") def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime):
