argibbs commented on code in PR #30076:
URL: https://github.com/apache/airflow/pull/30076#discussion_r1134393781


##########
airflow/dag_processing/manager.py:
##########
@@ -366,6 +366,10 @@ class DagFileProcessorManager(LoggingMixin):
     :param async_mode: whether to start the manager in async mode
     """
 
+    DEFAULT_FILE_STAT = DagFileStat(

Review Comment:
   Minor tweak: this is a named tuple, this immutable. So I'm reusing the same 
static instance over and over, rather than periodically creating new ones.



##########
airflow/dag_processing/manager.py:
##########
@@ -709,14 +717,19 @@ def _add_callback_to_queue(self, request: 
CallbackRequest):
                 self.log.debug("Skipping already queued SlaCallbackRequest")
                 return
 
-            # not already queued, queue the file _at the back_, and add the 
request to the file's callbacks
+            # not already queued, queue the callback
+            # do NOT add the file of this SLA to self._file_path_queue. SLAs 
can arrive so rapidly that
+            # they keep adding to the file queue and never letting it drain. 
This in turn prevents us from
+            # ever rescanning the dags folder for changes to existing dags. We 
simply store the callback, and
+            # periodically, when self._file_path_queue is drained, we rescan 
and re-queue all DAG files.
+            # The SLAs will be picked up then. It means a delay in reacting to 
the SLAs (as controlled by the
+            # min_file_process_interval config) but stops SLAs from DoS'ing 
the queue.
             self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
             self._callback_to_execute[request.full_filepath].append(request)
-            if request.full_filepath not in self._file_path_queue:
-                self._file_path_queue.append(request.full_filepath)
+            Stats.incr("dag_processing.sla_callback_count")

Review Comment:
   This change is the fix - we don't add to the queue. Note we _do_ still add 
the callback itself, so when the file is processed for some other reason 
(typically the periodic refresh of all dag files) the callback will be picked 
up then.



##########
airflow/dag_processing/manager.py:
##########
@@ -959,7 +973,15 @@ def set_file_paths(self, new_file_paths):
         :return: None
         """
         self._file_paths = new_file_paths
+
+        # clean up the queues; remove anything queued which no longer in the 
list, including callbacks
         self._file_path_queue = collections.deque(x for x in 
self._file_path_queue if x in new_file_paths)
+        Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_path_queue))
+
+        callback_paths_to_del = list(x for x in 
self._callback_to_execute.keys() if x not in new_file_paths)

Review Comment:
   This just closes a small mem-leak; we removed the files from the queue, but 
we didn't remove their callbacks.



##########
airflow/dag_processing/manager.py:
##########
@@ -1202,6 +1226,15 @@ def _kill_timed_out_processors(self):
         for proc in processors_to_remove:
             self._processors.pop(proc)
 
+    def _add_paths_to_queue(self, file_paths_to_enqueue: list[str], 
add_at_front: bool = False):

Review Comment:
   I added this method since there were two places we added to the queue and 
both were virtually identical, except one added at the front, and one at the 
back. This also avoided repeating the update of the new metric tracking the 
queue size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to