This is an automated email from the ASF dual-hosted git repository.
pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c7e13061bd Send explicit task logs when marking tasks stuck in queued
as failed (#35857)
c7e13061bd is described below
commit c7e13061bd472962ed571cb0d33784900217660a
Author: Pankaj Koti <[email protected]>
AuthorDate: Sun Nov 26 14:49:51 2023 +0530
Send explicit task logs when marking tasks stuck in queued as failed
(#35857)
Using the feature built in #32646, when the scheduler marks tasks
stuck in queued as failed, send such an explicit log indicating
the action to the task logs so that it helps users identify why
exactly the task was marked failed in such a case.
---------
Co-authored-by: Ephraim Anierobi <[email protected]>
---
airflow/jobs/scheduler_job_runner.py | 21 ++++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index b9d18098f6..fdf5f12edc 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1577,15 +1577,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
).all()
try:
- tis_for_warning_message =
self.job.executor.cleanup_stuck_queued_tasks(tis=tasks_stuck_in_queued)
- if tis_for_warning_message:
- task_instance_str = "\n\t".join(tis_for_warning_message)
- self.log.warning(
- "Marked the following %s task instances stuck in queued as
failed. "
- "If the task instance has available retries, it will be
retried.\n\t%s",
- len(tasks_stuck_in_queued),
- task_instance_str,
- )
+ cleaned_up_task_instances =
self.job.executor.cleanup_stuck_queued_tasks(
+ tis=tasks_stuck_in_queued
+ )
+ cleaned_up_task_instances = set(cleaned_up_task_instances)
+ for ti in tasks_stuck_in_queued:
+ if repr(ti) in cleaned_up_task_instances:
+ self._task_context_logger.warning(
+ "Marking task instance %s stuck in queued as failed. "
+ "If the task instance has available retries, it will
be retried.",
+ ti,
+ ti=ti,
+ )
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued
tasks. Skipping.")
...