argibbs commented on code in PR #25489:
URL: https://github.com/apache/airflow/pull/25489#discussion_r936089352
##########
airflow/dag_processing/manager.py:
##########
@@ -378,8 +378,17 @@ def __init__(
async_mode: bool = True,
):
super().__init__()
+ self._log = logging.getLogger('airflow.processor_manager')
Review Comment:
I moved this up from further down `__init__` because I added a log message
which came before it in the old location.
It seems ... weird ... that this doesn't just use the logging mixin. Is this
an artefact from Ye Olden Times?
##########
airflow/dag_processing/manager.py:
##########
@@ -378,8 +378,17 @@ def __init__(
async_mode: bool = True,
):
super().__init__()
+ self._log = logging.getLogger('airflow.processor_manager')
+
+ # known files; this will be updated every `dag_dir_list_interval` and
stuff added/removed accordingly
self._file_paths: List[str] = []
- self._file_path_queue: List[str] = []
+
+ # we maintain 2 queues: stuff requiring rapid response due to
scheduler updates, and stuff that
+ # should be serviced once the priority stuff has all been worked
through, e.g. periodic dir scans
+ # additionally there's a set to track which files on disk still
haven't been refreshed yet
+ self._priority_file_path_queue: Deque[str] = deque()
Review Comment:
I changed these from `list`s to `deque`s because teeeeechnically it's more
efficient.
Also, I like seeing people try to work out how to pronounce 'deque'.
##########
airflow/dag_processing/manager.py:
##########
@@ -539,7 +559,7 @@ def _run_parsing_loop(self):
poll_time = None
self._refresh_dag_dir()
- self.prepare_file_path_queue()
+ self.populate_std_file_queue_from_dir()
Review Comment:
Same method, different name. I thought this was more descriptive.
##########
tests/dag_processing/test_manager.py:
##########
@@ -299,9 +299,9 @@ def test_file_paths_in_queue_sorted_alphabetically(
)
manager.set_file_paths(dag_files)
- assert manager._file_path_queue == []
- manager.prepare_file_path_queue()
- assert manager._file_path_queue == ['file_1.py', 'file_2.py',
'file_3.py', 'file_4.py']
+ assert list(manager._std_file_path_queue) == []
Review Comment:
Have to convert all the `deque`s to lists. This is clunky.
##########
airflow/dag_processing/manager.py:
##########
@@ -921,8 +944,18 @@ def set_file_paths(self, new_file_paths):
:param new_file_paths: list of paths to DAG definition files
:return: None
"""
+ # store the new paths
self._file_paths = new_file_paths
- self._file_path_queue = [x for x in self._file_path_queue if x in
new_file_paths]
+
+ # clean up the queues; remove anything queued which no longer in the
list, including callbacks
+ self._priority_file_path_queue = deque(
+ x for x in self._priority_file_path_queue if x in new_file_paths
+ )
+ self._std_file_path_queue = deque(x for x in self._std_file_path_queue
if x in new_file_paths)
+ callback_paths_to_del = list(x for x in
self._callback_to_execute.keys() if x not in new_file_paths)
+ for path_to_del in callback_paths_to_del:
Review Comment:
This more thorough clean-up is new. Having the old callbacks lying around
was a form of memory leak, but in practice not a serious one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]