kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557436671
##########
File path: airflow/utils/dag_processing.py
##########
@@ -988,6 +993,15 @@ def start_new_processes(self):
"""Start more processors if we have enough slots and files to
process"""
while self._parallelism - len(self._processors) > 0 and
self._file_path_queue:
file_path = self._file_path_queue.pop(0)
+ # Stop creating duplicate processor i.e. processor with the same
filepath
+ if file_path in self._processors.keys():
+ # If filepath is no longer in the queue and if callback exists
to execute
+ # we add the filepath to the end of queue so it still runs
before
+ # new filepaths are added to the queue as we should runs
callbacks asap
+ if file_path not in self._file_path_queue and
self._callback_to_execute[file_path]:
+ self._file_path_queue.append(file_path)
Review comment:
I am in two minds about this. We can remove these lines -- and let it
get executed when the file is added again to file_path_queue
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]