tanelk commented on code in PR #23432:
URL: https://github.com/apache/airflow/pull/23432#discussion_r863427637


##########
airflow/executors/celery_executor.py:
##########
@@ -358,6 +369,64 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Tasks can get lost by celery if the celery worker is shut down while 
it is picking up
+        a new work item (observed when using redis broker). When this happens 
the task is stuck
+        in queued state indefinitely until the scheduler is restarted (which 
causes the task to be
+        picked up by the adoption code and re-scheduled at that point). This 
function is intended
+        to detect that situation and re-schedule those tasks without requiring 
a scheduler
+        restart. We chose to use task_adoption_timeout to decide when a queued 
task is considered
+        stuck and should be reschelduled.
+        """
+        self.log.debug("Checking for stuck queued tasks")
+        max_allowed_time = utcnow() - self.task_adoption_timeout
+        queued_too_long = (
+            session.query(TaskInstance)
+            .join(TaskInstance.dag_run)
+            .filter(
+                TaskInstance.state == State.QUEUED,
+                DagRun.state == State.RUNNING,
+                TaskInstance.queued_dttm < max_allowed_time,
+            )
+            .all()
+        )
+        # this filtering is done after the query rather than in the query 
because the query result
+        # set should always be quite small (a few rows at most), and 
self.tasks could be relatively
+        # large (hundreds or possibly thousands) which could make for a long 
db query in extreme cases

Review Comment:
   Could we filter on `queued_by_job_id` in the query?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to