ashb commented on a change in pull request #19769:
URL: https://github.com/apache/airflow/pull/19769#discussion_r755115318



##########
File path: airflow/executors/celery_executor.py
##########
@@ -377,6 +390,36 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session=None):
+        """
+        For some reasons, tasks can get stuck in queued state in DB while 
still not in
+        self.queued_tasks and not in self.running_tasks.
+
+        In such situation, we update the task instance state to scheduled so 
that
+        it can be queued again. We chose to use task_adoption_timeout to decide
+        """
+        self.log.debug("Checking for stuck queued tasks")
+        queued_tasks = session.query(TaskInstance).filter(TaskInstance.state 
== State.QUEUED).all()
+
+        for task in queued_tasks:
+
+            self.log.info("Checking task %s", task)
+
+            if task.external_executor_id:
+                continue
+            if task.key in self.queued_tasks or task.key in self.running:
+                continue
+            # We use the task_adoption_timeout to decide if the task is stuck
+            if task.queued_dttm > utcnow() - 
datetime.timedelta(seconds=self.task_adoption_timeout):
+                continue

Review comment:
       You can do most of these at the SQL level and have the query only return 
the rows that should be cleared.
   
   But we probably need to _only_ do tasks owned by "this" executor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to