jedcunningham commented on code in PR #30375:
URL: https://github.com/apache/airflow/pull/30375#discussion_r1166052103


##########
newsfragments/30375.significant.rst:
##########
@@ -0,0 +1,9 @@
+Simplify logic to resolve tasks stuck in queued despite stalled_task_timeout

Review Comment:
   ```suggestion
   Consolidate handling of tasks stuck in queued under new 
``task_queued_timeout`` config
   ```
   
   This is probably a better title?



##########
newsfragments/30375.significant.rst:
##########
@@ -0,0 +1,9 @@
+Simplify logic to resolve tasks stuck in queued despite stalled_task_timeout
+
+Logic for handling tasks stuck in the queued state has been consolidated, and 
the all configurations 
+responsible for timing out stuck queued tasks have been deprecated and merged 
into 
+`scheduler.task_queued_timeout`. The configurations that have been deprecated 
are 
+`kubernetes.worker_pods_pending_timeout`, `celery.stalled_task_timeout`, and 
+`celery.task_adoption_timeout`. If any of these configurations are set, the 
longest timeout will be
+respected. For example, if `celery.stalled_task_timeout` is 1200, and 
`scheduler.task_queued_timeout` 
+is 600, Airflow will set `scheduler.task_queued_timeout` to 1200.

Review Comment:
   ```suggestion
   Logic for handling tasks stuck in the queued state has been consolidated, 
and the all configurations
   responsible for timing out stuck queued tasks have been deprecated and 
merged into
   ``scheduler.task_queued_timeout``. The configurations that have been 
deprecated are
   ``kubernetes.worker_pods_pending_timeout``, ``celery.stalled_task_timeout``, 
and
   ``celery.task_adoption_timeout``. If any of these configurations are set, 
the longest timeout will be
   respected. For example, if ``celery.stalled_task_timeout`` is 1200, and 
``scheduler.task_queued_timeout``
   is 600, Airflow will set ``scheduler.task_queued_timeout`` to 1200.
   
   ```
   
   Fixes static checks.



##########
newsfragments/30375.significant.rst:
##########
@@ -0,0 +1,9 @@
+Simplify logic to resolve tasks stuck in queued despite stalled_task_timeout
+
+Logic for handling tasks stuck in the queued state has been consolidated, and 
the all configurations 
+responsible for timing out stuck queued tasks have been deprecated and merged 
into 
+`scheduler.task_queued_timeout`. The configurations that have been deprecated 
are 

Review Comment:
   ```suggestion
   `[scheduler] task_queued_timeout`. The configurations that have been 
deprecated are 
   ```
   
   Let's reference these config options like this instead.



##########
airflow/executors/base_executor.py:
##########
@@ -376,6 +376,18 @@ def terminate(self):
         """This method is called when the daemon receives a SIGTERM."""
         raise NotImplementedError()
 
+    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+        """
+        Handle remnants of tasks that were failed because they were stuck in 
queued.
+        Tasks can get stuck in queued. If such a task is detected, it will be 
marked
+        as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
+        if it doesn't.
+
+        :param tis: List of Task Instances to clean up
+        :return: List of readable task instances for a warning message
+        """
+        raise NotImplementedError()

Review Comment:
   I just realized we haven't properly handled what happens when another 
executor hits this (however unlikely).
   
   Easy way to reproduce this would be setting a really low 
task_queued_timeout, and add a longer sleep to the top level of a DAG, when 
using LocalExecutor.
   
   Maybe log this should just call fail also? It shouldn't kill the scheduler 
I'd think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to