kaxil commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603359237



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < 
self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, 
reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can 
work on different
+            # set of files. Since we set the seed, the sort order will remain 
same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if 
stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
-        )
+        files_paths_to_queue = [
+            file_path
+            for file_path in file_paths
+            if file_path
+            not in 
set(file_paths_in_progress).union(file_paths_recently_processed, 
files_paths_at_run_limit)
+        ]

Review comment:
       Added a comment before the code block to explain.
   
   Basically since we were converting list to set just to exclude certain 
items, the order of `file_paths` was not maintained anymore, which basically 
would make the modes useless. Hence the current change, preservers insertion 
order because of list and remove items that are also in one of 
`file_paths_in_progress`, `file_paths_recently_processed` or 
`files_paths_at_run_limit`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to