ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603544147



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,26 +1019,54 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < 
self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, 
reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can 
work on different
+            # set of files. Since we set the seed, the sort order will remain 
same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if 
stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
+        # Do not convert the following list to set as set does not preserve 
the order
+        # and we need to maintain the order of file_paths for `[scheduler] 
file_parsing_sort_mode`
+        file_paths_to_exclude = set(file_paths_in_progress).union(
+            file_paths_recently_processed, files_paths_at_run_limit
         )
 
+        files_paths_to_queue = [

Review comment:
       ```suggestion
           file_paths_to_exclude = set(file_paths_in_progress).union(
               file_paths_recently_processed, files_paths_at_run_limit
           )
   
           # Do not convert the following list to set as set does not preserve 
the order
           # and we need to maintain the order of file_paths for `[scheduler] 
file_parsing_sort_mode`
           files_paths_to_queue = [
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to