kaxil commented on code in PR #60124:
URL: https://github.com/apache/airflow/pull/60124#discussion_r2665045804


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -963,13 +961,22 @@ def _start_new_processes(self):
             self._processors[file] = processor
             Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-    def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):
+    def _add_new_files_to_queue(self, known_files: dict[str, 
set[DagFileInfo]]):
+        """
+        Add new files to the front of the queue.
+
+        A "new" file is a file that has not been processed yet and is not 
currently being processed.
+        """
+        new_files = []
         for files in known_files.values():
             for file in files:
-                if file not in self._file_stats:  # todo: store stats by 
bundle also?
-                    # We found new file after refreshing dir. add to parsing 
queue at start
-                    self.log.info("Adding new file %s to parsing queue", file)
-                    self._file_queue.appendleft(file)
+                # todo: store stats by bundle also?
+                if file not in self._file_stats and file not in 
self._processors:
+                    new_files.append(file)
+
+        if new_files:
+            self.log.info("Adding %d new files to the front of the queue", 
len(new_files))
+            self._add_files_to_queue(new_files, True)

Review Comment:
   Should we emit metrics even when no new files are found, to track that the 
check happened?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to