potiuk commented on code in PR #25147:
URL: https://github.com/apache/airflow/pull/25147#discussion_r926379045


##########
airflow/dag_processing/manager.py:
##########
@@ -679,16 +679,37 @@ def _fetch_callbacks(self, max_callbacks: int, session: 
Session = NEW_SESSION):
             guard.commit()
 
     def _add_callback_to_queue(self, request: CallbackRequest):
-        self._callback_to_execute[request.full_filepath].append(request)
-        # Callback has a higher priority over DAG Run scheduling
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from 
self._file_path_queue
-            # Since we are already going to use that filepath to run callback,
-            # there is no need to have same file path again in the queue
-            self._file_path_queue = [
-                file_path for file_path in self._file_path_queue if file_path 
!= request.full_filepath
-            ]
-        self._file_path_queue.insert(0, request.full_filepath)
+
+        # requests are sent by dag processors. SLAs exist per-dag, but can be 
generated once per SLA-enabled
+        # task in the dag. If treated like other callbacks, SLAs can cause 
feedback where a SLA arrives,
+        # goes to the front of the queue, gets processed, triggers more SLAs 
from the same DAG, which go to
+        # the front of the queue, and we never get round to picking stuff off 
the back of the queue
+        if isinstance(request, SlaCallbackRequest):
+            if request in self._callback_to_execute[request.full_filepath]:
+                self.log.debug("Skipping already queued SlaCallbackRequest")
+                return
+
+            # not already queued, queue the file _at the back_, and add the 
request to the file's callbacks
+            self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
+            self._callback_to_execute[request.full_filepath].append(request)
+            if request.full_filepath not in self._file_path_queue:
+                self._file_path_queue.append(request.full_filepath)
+
+        # Other callbacks have a higher priority over DAG Run scheduling, so 
those callbacks gazump, even if
+        # already in the queue
+        else:
+            self.log.debug(
+                "Queuing %s CallbackRequest: %s", 
type(request).__name__.rsplit('.', 1)[-1], request

Review Comment:
   small thing here. tTe rsplit will be evaluated every time you go trough that 
line - no matter debug level. We should avoid that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to