ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603532950
##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
# processed recently, wait until the next batch
file_paths_in_progress = self._processors.keys()
now = timezone.utcnow()
+
+ # Sort the file paths by the parsing order mode
+ list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+ files_with_mtime = {}
+ file_paths = []
+ is_mtime_mode = list_mode == "modified_time"
+
file_paths_recently_processed = []
for file_path in self._file_paths:
+
+ if is_mtime_mode:
+ files_with_mtime[file_path] = os.path.getmtime(file_path)
+ else:
+ file_paths.append(file_path)
+
+ # Find file paths that were recently processed
last_finish_time = self.get_last_finish_time(file_path)
if (
last_finish_time is not None
and (now - last_finish_time).total_seconds() <
self._file_process_interval
):
file_paths_recently_processed.append(file_path)
+ # Sort file paths via last modified time
+ if is_mtime_mode:
+ file_paths = sorted(files_with_mtime, key=files_with_mtime.get,
reverse=True)
+ elif list_mode == "alphabetical":
+ file_paths = sorted(file_paths)
+ elif list_mode == "random_seeded_by_host":
+ # Shuffle the list seeded by hostname so multiple schedulers can
work on different
+ # set of files. Since we set the seed, the sort order will remain
same per host
+ random.Random(get_hostname()).shuffle(file_paths)
+
files_paths_at_run_limit = [
file_path for file_path, stat in self._file_stats.items() if
stat.run_count == self._max_runs
]
- files_paths_to_queue = list(
- set(self._file_paths)
- - set(file_paths_in_progress)
- - set(file_paths_recently_processed)
- - set(files_paths_at_run_limit)
- )
+ files_paths_to_queue = [
+ file_path
+ for file_path in file_paths
+ if file_path
+ not in
set(file_paths_in_progress).union(file_paths_recently_processed,
files_paths_at_run_limit)
+ ]
Review comment:
OH YEAH :D
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]