kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557436671



##########
File path: airflow/utils/dag_processing.py
##########
@@ -988,6 +993,15 @@ def start_new_processes(self):
         """Start more processors if we have enough slots and files to 
process"""
         while self._parallelism - len(self._processors) > 0 and 
self._file_path_queue:
             file_path = self._file_path_queue.pop(0)
+            # Stop creating duplicate processor i.e. processor with the same 
filepath
+            if file_path in self._processors.keys():
+                # If filepath is no longer in the queue and if callback exists 
to execute
+                # we add the filepath to the end of queue so it still runs 
before
+                # new filepaths are added to the queue as we should runs 
callbacks asap
+                if file_path not in self._file_path_queue and 
self._callback_to_execute[file_path]:
+                    self._file_path_queue.append(file_path)

Review comment:
       I am in two minds about this. We can remove these lines -- and let it 
get executed when the file is added again to file_path_queue




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to