repl-chris commented on code in PR #23432:
URL: https://github.com/apache/airflow/pull/23432#discussion_r867199084
##########
airflow/executors/celery_executor.py:
##########
@@ -358,6 +369,71 @@ def _check_for_stalled_adopted_tasks(self):
for key in timedout_keys:
self.change_state(key, State.FAILED)
+ @provide_session
+ def _clear_stuck_queued_tasks(self, session: Session = NEW_SESSION) ->
None:
+ """
+ Tasks can get lost by celery if the celery worker is shut down while
it is picking up
+ a new work item (observed when using redis broker). When this happens
the task is stuck
+ in queued state indefinitely until the scheduler is restarted (which
causes the task to be
+ picked up by the adoption code and re-scheduled at that point). This
function is intended
+ to detect that situation and re-schedule those tasks without requiring
a scheduler
+ restart. We chose to use task_adoption_timeout to decide when a queued
task is considered
+ stuck and should be rescheduled.
+ """
+ self.log.debug("Checking for stuck queued tasks")
+ max_allowed_time = utcnow() - self.task_adoption_timeout
+ queued_too_long = (
+ session.query(TaskInstance)
+ .join(TaskInstance.dag_run)
+ .filter(
+ TaskInstance.state == State.QUEUED,
+ TaskInstance.queued_by_job_id == self.job_id,
+ DagRun.state == State.RUNNING,
+ TaskInstance.queued_dttm < max_allowed_time,
+ )
+ .all()
+ )
+ # this filtering is done after the query rather than in the query
because the query result
+ # set should always be quite small (a few rows at most), and
self.tasks could be relatively
+ # large (hundreds or possibly thousands) which could make for a long
db query in extreme cases
+ queued_too_long = [ti for ti in queued_too_long if ti.key in
self.tasks]
+ if not queued_too_long:
+ return
+
+ try:
+ with timeout(seconds=15):
Review Comment:
No, the UI isn't complaining at all. As for the timeout I've never seen it
- I only put that code in because you had it in the original PR here
https://github.com/apache/airflow/pull/21556/commits/32d7060eb9927e14ada5f739d372a688538cf82b
....but I guess I did change the query to probably be a decent amount faster
than the previous implementation, so I really doubt it's necessary anymore
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]