ephraimbuddy commented on a change in pull request #19769:
URL: https://github.com/apache/airflow/pull/19769#discussion_r756022501



##########
File path: airflow/executors/celery_executor.py
##########
@@ -377,6 +390,36 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session=None):
+        """
+        For some reasons, tasks can get stuck in queued state in DB while 
still not in
+        self.queued_tasks and not in self.running_tasks.
+
+        In such situation, we update the task instance state to scheduled so 
that
+        it can be queued again. We chose to use task_adoption_timeout to decide
+        """
+        self.log.debug("Checking for stuck queued tasks")
+        queued_tasks = session.query(TaskInstance).filter(TaskInstance.state 
== State.QUEUED).all()
+
+        for task in queued_tasks:
+
+            self.log.info("Checking task %s", task)
+
+            if task.external_executor_id:
+                continue
+            if task.key in self.queued_tasks or task.key in self.running:
+                continue
+            # We use the task_adoption_timeout to decide if the task is stuck
+            if task.queued_dttm > utcnow() - 
datetime.timedelta(seconds=self.task_adoption_timeout):
+                continue

Review comment:
       Since the external_executor_id of the task instance is no longer 
available in celery is there a way I could get only for the tasks in this 
executor? I'm currently searching broadly using AsyncResult to see if another 
executor is running this task




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to