ashb commented on code in PR #23432:
URL: https://github.com/apache/airflow/pull/23432#discussion_r866976840


##########
airflow/executors/celery_executor.py:
##########
@@ -358,6 +369,71 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Tasks can get lost by celery if the celery worker is shut down while 
it is picking up
+        a new work item (observed when using redis broker). When this happens 
the task is stuck
+        in queued state indefinitely until the scheduler is restarted (which 
causes the task to be
+        picked up by the adoption code and re-scheduled at that point). This 
function is intended
+        to detect that situation and re-schedule those tasks without requiring 
a scheduler
+        restart. We chose to use task_adoption_timeout to decide when a queued 
task is considered
+        stuck and should be rescheduled.
+        """
+        self.log.debug("Checking for stuck queued tasks")
+        max_allowed_time = utcnow() - self.task_adoption_timeout
+        queued_too_long = (
+            session.query(TaskInstance)
+            .join(TaskInstance.dag_run)
+            .filter(
+                TaskInstance.state == State.QUEUED,
+                TaskInstance.queued_by_job_id == self.job_id,
+                DagRun.state == State.RUNNING,
+                TaskInstance.queued_dttm < max_allowed_time,
+            )
+            .all()
+        )
+        # this filtering is done after the query rather than in the query 
because the query result
+        # set should always be quite small (a few rows at most), and 
self.tasks could be relatively
+        # large (hundreds or possibly thousands) which could make for a long 
db query in extreme cases
+        queued_too_long = [ti for ti in queued_too_long if ti.key in 
self.tasks]
+        if not queued_too_long:
+            return
+
+        try:
+            with timeout(seconds=15):
+                latest_states = self.bulk_state_fetcher.get_many(
+                    [self.tasks[ti.key] for ti in queued_too_long]
+                )
+                queued_too_long = [
+                    ti
+                    for ti in queued_too_long
+                    if latest_states[ti.external_executor_id][0] == 
celery_states.PENDING
+                ]
+        except AirflowTaskTimeout:
+            # This "latest state" check isn't super important, so if it's 
taking too long we'll
+            # just log it and continue on
+            self.log.debug("Timed out while loading latest celery task states 
for stuck queued tasks")

Review Comment:
   If it times out then I don't think we should continue.



##########
airflow/config_templates/config.yml:
##########
@@ -1788,6 +1788,13 @@
       type: string
       example: ~
       default: "False"
+    - name: stuck_queued_task_check_interval

Review Comment:
   Does this need a new setting, or is the existing "orphan" settings  enough 
here?



##########
airflow/executors/celery_executor.py:
##########
@@ -358,6 +369,71 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Tasks can get lost by celery if the celery worker is shut down while 
it is picking up
+        a new work item (observed when using redis broker). When this happens 
the task is stuck
+        in queued state indefinitely until the scheduler is restarted (which 
causes the task to be
+        picked up by the adoption code and re-scheduled at that point). This 
function is intended
+        to detect that situation and re-schedule those tasks without requiring 
a scheduler
+        restart. We chose to use task_adoption_timeout to decide when a queued 
task is considered
+        stuck and should be rescheduled.
+        """
+        self.log.debug("Checking for stuck queued tasks")
+        max_allowed_time = utcnow() - self.task_adoption_timeout
+        queued_too_long = (
+            session.query(TaskInstance)
+            .join(TaskInstance.dag_run)
+            .filter(
+                TaskInstance.state == State.QUEUED,
+                TaskInstance.queued_by_job_id == self.job_id,
+                DagRun.state == State.RUNNING,
+                TaskInstance.queued_dttm < max_allowed_time,
+            )
+            .all()
+        )
+        # this filtering is done after the query rather than in the query 
because the query result
+        # set should always be quite small (a few rows at most), and 
self.tasks could be relatively
+        # large (hundreds or possibly thousands) which could make for a long 
db query in extreme cases
+        queued_too_long = [ti for ti in queued_too_long if ti.key in 
self.tasks]
+        if not queued_too_long:
+            return
+
+        try:
+            with timeout(seconds=15):
+                latest_states = self.bulk_state_fetcher.get_many(
+                    [self.tasks[ti.key] for ti in queued_too_long]
+                )
+                queued_too_long = [
+                    ti
+                    for ti in queued_too_long
+                    if latest_states[ti.external_executor_id][0] == 
celery_states.PENDING
+                ]
+        except AirflowTaskTimeout:
+            # This "latest state" check isn't super important, so if it's 
taking too long we'll
+            # just log it and continue on
+            self.log.debug("Timed out while loading latest celery task states 
for stuck queued tasks")
+
+        for task in queued_too_long:
+            self.log.info(
+                'TaskInstance: %s found in queued state for more than %s 
seconds, rescheduling',
+                task,
+                self.task_adoption_timeout.total_seconds(),
+            )
+            task.state = State.SCHEDULED
+            task.queued_dttm = None
+            session.merge(task)

Review Comment:
   ```suggestion
   ```
   
   Not needed. task is already attached to the session (from when we queried it 
on L385)



##########
airflow/executors/celery_executor.py:
##########
@@ -358,6 +369,71 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Tasks can get lost by celery if the celery worker is shut down while 
it is picking up
+        a new work item (observed when using redis broker). When this happens 
the task is stuck
+        in queued state indefinitely until the scheduler is restarted (which 
causes the task to be
+        picked up by the adoption code and re-scheduled at that point). This 
function is intended
+        to detect that situation and re-schedule those tasks without requiring 
a scheduler
+        restart. We chose to use task_adoption_timeout to decide when a queued 
task is considered
+        stuck and should be rescheduled.
+        """
+        self.log.debug("Checking for stuck queued tasks")
+        max_allowed_time = utcnow() - self.task_adoption_timeout
+        queued_too_long = (
+            session.query(TaskInstance)
+            .join(TaskInstance.dag_run)
+            .filter(
+                TaskInstance.state == State.QUEUED,
+                TaskInstance.queued_by_job_id == self.job_id,
+                DagRun.state == State.RUNNING,
+                TaskInstance.queued_dttm < max_allowed_time,
+            )
+            .all()
+        )
+        # this filtering is done after the query rather than in the query 
because the query result
+        # set should always be quite small (a few rows at most), and 
self.tasks could be relatively
+        # large (hundreds or possibly thousands) which could make for a long 
db query in extreme cases
+        queued_too_long = [ti for ti in queued_too_long if ti.key in 
self.tasks]
+        if not queued_too_long:
+            return
+
+        try:
+            with timeout(seconds=15):
+                latest_states = self.bulk_state_fetcher.get_many(
+                    [self.tasks[ti.key] for ti in queued_too_long]
+                )
+                queued_too_long = [
+                    ti
+                    for ti in queued_too_long
+                    if latest_states[ti.external_executor_id][0] == 
celery_states.PENDING
+                ]
+        except AirflowTaskTimeout:
+            # This "latest state" check isn't super important, so if it's 
taking too long we'll
+            # just log it and continue on
+            self.log.debug("Timed out while loading latest celery task states 
for stuck queued tasks")
+
+        for task in queued_too_long:
+            self.log.info(
+                'TaskInstance: %s found in queued state for more than %s 
seconds, rescheduling',
+                task,
+                self.task_adoption_timeout.total_seconds(),

Review Comment:
   Wrong timer.



##########
airflow/executors/celery_executor.py:
##########
@@ -358,6 +369,71 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Tasks can get lost by celery if the celery worker is shut down while 
it is picking up
+        a new work item (observed when using redis broker). When this happens 
the task is stuck
+        in queued state indefinitely until the scheduler is restarted (which 
causes the task to be
+        picked up by the adoption code and re-scheduled at that point). This 
function is intended
+        to detect that situation and re-schedule those tasks without requiring 
a scheduler
+        restart. We chose to use task_adoption_timeout to decide when a queued 
task is considered
+        stuck and should be rescheduled.
+        """
+        self.log.debug("Checking for stuck queued tasks")
+        max_allowed_time = utcnow() - self.task_adoption_timeout
+        queued_too_long = (
+            session.query(TaskInstance)
+            .join(TaskInstance.dag_run)
+            .filter(
+                TaskInstance.state == State.QUEUED,
+                TaskInstance.queued_by_job_id == self.job_id,
+                DagRun.state == State.RUNNING,
+                TaskInstance.queued_dttm < max_allowed_time,
+            )
+            .all()
+        )
+        # this filtering is done after the query rather than in the query 
because the query result
+        # set should always be quite small (a few rows at most), and 
self.tasks could be relatively
+        # large (hundreds or possibly thousands) which could make for a long 
db query in extreme cases
+        queued_too_long = [ti for ti in queued_too_long if ti.key in 
self.tasks]
+        if not queued_too_long:
+            return
+
+        try:
+            with timeout(seconds=15):
+                latest_states = self.bulk_state_fetcher.get_many(
+                    [self.tasks[ti.key] for ti in queued_too_long]
+                )
+                queued_too_long = [
+                    ti
+                    for ti in queued_too_long
+                    if latest_states[ti.external_executor_id][0] == 
celery_states.PENDING
+                ]
+        except AirflowTaskTimeout:
+            # This "latest state" check isn't super important, so if it's 
taking too long we'll
+            # just log it and continue on
+            self.log.debug("Timed out while loading latest celery task states 
for stuck queued tasks")
+
+        for task in queued_too_long:

Review Comment:
   ```suggestion
           for ti in queued_too_long:
   ```
   
   etc.
   
   Naming is important -- this is a TaskInstance, not a Task



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to