jedcunningham commented on code in PR #30375:
URL: https://github.com/apache/airflow/pull/30375#discussion_r1165739839
##########
airflow/executors/celery_executor.py:
##########
@@ -543,22 +417,28 @@ def try_adopt_task_instances(self, tis:
Sequence[TaskInstance]) -> Sequence[Task
return not_adopted_tis
- def _set_celery_pending_task_timeout(
- self, key: TaskInstanceKey, timeout_type:
_CeleryPendingTaskTimeoutType | None
- ) -> None:
+ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
- Set pending task timeout.
+ Handle remnants of tasks that were failed because they were stuck in
queued.
+ Tasks can get stuck in queued. If such a task is detected, it will be
marked
+ as `UP_FOR_RETRY` if the task instance has remaining retries or marked
as `FAILED`
+ if it doesn't.
- We use the fact that dicts maintain insertion order, and the the
timeout for a
- task is always "now + delta" to maintain the property that oldest item
= first to
- time out.
+ :param tis: List of Task Instances to clean up
+ :return: List of readable task instances for a warning message
"""
- self.adopted_task_timeouts.pop(key, None)
- self.stalled_task_timeouts.pop(key, None)
- if timeout_type == _CeleryPendingTaskTimeoutType.ADOPTED and
self.task_adoption_timeout:
- self.adopted_task_timeouts[key] = utcnow() +
self.task_adoption_timeout
- elif timeout_type == _CeleryPendingTaskTimeoutType.STALLED and
self.stalled_task_timeout:
- self.stalled_task_timeouts[key] = utcnow() +
self.stalled_task_timeout
+ readable_tis = []
+ for ti in tis:
+ readable_tis.append(repr(ti))
+ task_instance_key = ti.key
Review Comment:
Okay, so I had a chat with @ephraimbuddy and I think we should give this a
try instead. Let's remove it from running, and add the failure to the
event_buffer explicitly. In theory, then we don't have to set the task as
failed here.
This is essentially what KE is doing indirectly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]