jedcunningham commented on code in PR #60864:
URL: https://github.com/apache/airflow/pull/60864#discussion_r2722593636


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -989,8 +990,28 @@ def _add_new_files_to_queue(self, known_files: dict[str, 
set[DagFileInfo]]):
 
     def _resort_file_queue(self):
         if self._file_parsing_sort_mode == "modified_time" and 
self._file_queue:
-            files, _ = self._sort_by_mtime(self._file_queue)
-            self._file_queue = deque(files)
+            files_with_mtime: dict[DagFileInfo, float] = {}
+            mtime_changed = False
+
+            for file in list(self._file_queue):
+                try:
+                    mtime = os.path.getmtime(file.absolute_path)
+                    files_with_mtime[file] = mtime
+                    stat = self._file_stats[file]  # Creates entry via 
defaultdict if missing
+                    if stat.last_mtime != mtime:
+                        mtime_changed = True
+                        stat.last_mtime = mtime
+                except FileNotFoundError:
+                    self.log.warning("Skipping processing of missing file: 
%s", file)
+                    self._file_stats.pop(file, None)
+                    mtime_changed = True  # Queue structure changed
+
+            if not mtime_changed:
+                return  # No changes, skip sorting
+
+            # Sort by mtime descending and rebuild queue
+            sorted_files = [f for f, _ in sorted(files_with_mtime.items(), 
key=itemgetter(1), reverse=True)]

Review Comment:
   Wouldn't this put new files at the end of the list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to