argibbs commented on code in PR #25489:
URL: https://github.com/apache/airflow/pull/25489#discussion_r936122004


##########
airflow/dag_processing/manager.py:
##########
@@ -993,19 +1027,56 @@ def _create_process(file_path, pickle_dags, dag_ids, 
callback_requests):
             file_path=file_path, pickle_dags=pickle_dags, dag_ids=dag_ids, 
callback_requests=callback_requests
         )
 
+    def is_overdue(self, file_path: str, as_of: datetime) -> bool:
+        if self._max_file_process_interval <= 0:
+            return False
+        last_finish_time = self.get_last_finish_time(file_path)
+        return (
+            last_finish_time is not None
+            and (as_of - last_finish_time).total_seconds() > 
self._max_file_process_interval
+        )
+
     def start_new_processes(self):
         """Start more processors if we have enough slots and files to 
process"""
-        while self._parallelism - len(self._processors) > 0 and 
self._file_path_queue:
-            file_path = self._file_path_queue.pop(0)
-            # Stop creating duplicate processor i.e. processor with the same 
filepath
+        # check for files stuck on the std queue for too long; these need to 
be processed first, as they
+        # suggest that we found files on disk a long time ago and still 
haven't processed it
+        now = timezone.utcnow()
+        overdue_std_files_list = [p for p in self._file_paths if 
self.is_overdue(p, now)]

Review Comment:
   Note we're checking all known files (`self._file_paths`) rather than the std 
queue. This is because if the priority queue is spinning, then the std queue 
won't empty, and won't reload from disk, and there might be files on disk not 
in the std queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to