pavansharma36 commented on code in PR #27060:
URL: https://github.com/apache/airflow/pull/27060#discussion_r996308567
##########
airflow/dag_processing/manager.py:
##########
@@ -1041,6 +1047,18 @@ def start_new_processes(self):
self._processors[file_path] = processor
self.waitables[processor.waitable_handle] = processor
+
+ def add_new_file_path_to_queue(self):
+ for file_path in self.file_paths:
+ if file_path not in self._file_stats:
+ # We found new file after refreshing dir. add to parsing queue
at start
+ self.log.info('Adding new file %s to parsing queue', file_path)
+ self._file_stats[file_path] = DagFileStat(
+ num_dags=0, import_errors=0, last_finish_time=None,
last_duration=None, run_count=0
+ )
+ self._file_path_queue.insert(0, file_path)
Review Comment:
@ephraimbuddy insert with 0 index is already used in function
_add_callback_to_queue
should I change type to deque it will affect multiple places?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]