argibbs commented on code in PR #25489:
URL: https://github.com/apache/airflow/pull/25489#discussion_r936089352


##########
airflow/dag_processing/manager.py:
##########
@@ -378,8 +378,17 @@ def __init__(
         async_mode: bool = True,
     ):
         super().__init__()
+        self._log = logging.getLogger('airflow.processor_manager')

Review Comment:
   I moved this up from further down `__init__` because I added a log message 
which came before it in the old location.
   
   It seems ... weird ... that this doesn't just use the logging mixin. Is this 
an artefact from Ye Olden Times?



##########
airflow/dag_processing/manager.py:
##########
@@ -378,8 +378,17 @@ def __init__(
         async_mode: bool = True,
     ):
         super().__init__()
+        self._log = logging.getLogger('airflow.processor_manager')
+
+        # known files; this will be updated every `dag_dir_list_interval` and 
stuff added/removed accordingly
         self._file_paths: List[str] = []
-        self._file_path_queue: List[str] = []
+
+        # we maintain 2 queues: stuff requiring rapid response due to 
scheduler updates, and stuff that
+        # should be serviced once the priority stuff has all been worked 
through, e.g. periodic dir scans
+        # additionally there's a set to track which files on disk still 
haven't been refreshed yet
+        self._priority_file_path_queue: Deque[str] = deque()

Review Comment:
   I changed these from `list`s to `deque`s because teeeeechnically it's more 
efficient.
   
   Also, I like seeing people try to work out how to pronounce 'deque'.



##########
airflow/dag_processing/manager.py:
##########
@@ -539,7 +559,7 @@ def _run_parsing_loop(self):
             poll_time = None
 
         self._refresh_dag_dir()
-        self.prepare_file_path_queue()
+        self.populate_std_file_queue_from_dir()

Review Comment:
   Same method, different name. I thought this was more descriptive.



##########
tests/dag_processing/test_manager.py:
##########
@@ -299,9 +299,9 @@ def test_file_paths_in_queue_sorted_alphabetically(
         )
 
         manager.set_file_paths(dag_files)
-        assert manager._file_path_queue == []
-        manager.prepare_file_path_queue()
-        assert manager._file_path_queue == ['file_1.py', 'file_2.py', 
'file_3.py', 'file_4.py']
+        assert list(manager._std_file_path_queue) == []

Review Comment:
   Have to convert all the `deque`s to lists. This is clunky.



##########
airflow/dag_processing/manager.py:
##########
@@ -921,8 +944,18 @@ def set_file_paths(self, new_file_paths):
         :param new_file_paths: list of paths to DAG definition files
         :return: None
         """
+        # store the new paths
         self._file_paths = new_file_paths
-        self._file_path_queue = [x for x in self._file_path_queue if x in 
new_file_paths]
+
+        # clean up the queues; remove anything queued which no longer in the 
list, including callbacks
+        self._priority_file_path_queue = deque(
+            x for x in self._priority_file_path_queue if x in new_file_paths
+        )
+        self._std_file_path_queue = deque(x for x in self._std_file_path_queue 
if x in new_file_paths)
+        callback_paths_to_del = list(x for x in 
self._callback_to_execute.keys() if x not in new_file_paths)
+        for path_to_del in callback_paths_to_del:

Review Comment:
   This more thorough clean-up is new. Having the old callbacks lying around 
was a form of memory leak, but in practice not a serious one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to