uranusjr commented on a change in pull request #19769:
URL: https://github.com/apache/airflow/pull/19769#discussion_r756026593



##########
File path: airflow/executors/celery_executor.py
##########
@@ -377,6 +387,43 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session=None):
+        """
+        For some reasons, tasks can get stuck in queued state in DB while 
still not in
+        self.queued_tasks and not in self.running_tasks.
+
+        In such situation, we update the task instance state to scheduled so 
that
+        it can be queued again. We chose to use task_adoption_timeout to decide
+        """
+        self.log.debug("Checking for stuck queued tasks")
+        queued_tasks = session.query(TaskInstance).filter(TaskInstance.state 
== State.QUEUED).all()
+
+        for task in queued_tasks:
+
+            self.log.info("Checking task %s", task)
+
+            if task.key in self.queued_tasks or task.key in self.running or 
not task.external_executor_id:
+                continue
+            # The pending state means task is waiting for execution or unknown
+            if AsyncResult(id=task.external_executor_id, app=app).state != 
'PENDING':
+                continue
+            # We use the task_adoption_timeout to decide if the task is stuck
+            max_allowed_time = utcnow() - self.task_adoption_timeout
+
+            if max_allowed_time > task.queued_dttm:
+                # Using the if condition above instead of adding to query so 
that the logging below is correct
+                self.log.info(
+                    'TaskInstance: %s found in queued state for more than %s 
seconds, rescheduling',
+                    task,
+                    self.task_adoption_timeout.total_seconds(),
+                )
+                session.query(TaskInstance).filter(
+                    TaskInstance.dag_id == task.dag_id,
+                    TaskInstance.task_id == task.task_id,
+                    TaskInstance.run_id == task.run_id,
+                ).update({TaskInstance.state: State.SCHEDULED})

Review comment:
       Why not `task.state = State.SCHEDULED`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to