Chris7 commented on a change in pull request #21556:
URL: https://github.com/apache/airflow/pull/21556#discussion_r828050512



##########
File path: airflow/executors/celery_executor.py
##########
@@ -402,28 +404,39 @@ def _clear_stuck_queued_tasks(self, session: Session = 
NEW_SESSION) -> None:
             # We only want to do this for database backends where
             # this case has been spotted
             return
-        # We use this instead of using bulk_state_fetcher because we
-        # may not have the stuck task in self.tasks and we don't want
-        # to clear task in self.tasks too
-        session_ = app.backend.ResultSession()
-        task_cls = getattr(app.backend, "task_cls", TaskDb)
-        with session_cleanup(session_):
-            celery_task_ids = [
-                t.task_id
-                for t in session_.query(task_cls.task_id)
-                .filter(~task_cls.status.in_([celery_states.SUCCESS, 
celery_states.FAILURE]))
-                .all()
-            ]
         self.log.debug("Checking for stuck queued tasks")
 
         max_allowed_time = utcnow() - self.task_adoption_timeout
 
-        for task in session.query(TaskInstance).filter(
-            TaskInstance.state == State.QUEUED, TaskInstance.queued_dttm < 
max_allowed_time
-        ):
-            if task.key in self.queued_tasks or task.key in self.running:
-                continue
+        queued_too_log = (
+            session.query(TaskInstance)
+            .filter(TaskInstance.state == State.QUEUED, 
TaskInstance.queued_dttm < max_allowed_time)
+            .all()
+        )
 
+        if queued_too_log:
+            # We use this instead of using bulk_state_fetcher because we
+            # may not have the stuck task in self.tasks and we don't want
+            # to clear task in self.tasks too
+            session_ = app.backend.ResultSession()
+            task_cls = getattr(app.backend, "task_cls", TaskDb)
+            with session_cleanup(session_):
+                try:
+                    with timeout(seconds=15):
+                        celery_task_ids = [
+                            t.task_id
+                            for t in session_.query(task_cls.task_id)
+                            
.filter(~task_cls.status.in_([celery_states.SUCCESS, celery_states.FAILURE]))
+                            
.filter(task_cls.task_id.in_([ti.external_executor_id for ti in 
queued_too_log]))
+                            .all()
+                        ]
+                except Exception:
+                    # This timeout is not important, we should skip

Review comment:
       If this exception is hit, `celery_task_ids` will not be set, and there 
will be `NameError` from `if task.external_executor_id in celery_task_ids`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to