vincbeck commented on code in PR #40867:
URL: https://github.com/apache/airflow/pull/40867#discussion_r1683310982
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1649,11 +1658,15 @@ def _fail_tasks_stuck_in_queued(self, session: Session
= NEW_SESSION) -> None:
cleaned_up_task_instances =
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
for ti in stuck_tis:
if repr(ti) in cleaned_up_task_instances:
- self._task_context_logger.warning(
- "Marking task instance %s stuck in queued as
failed. "
- "If the task instance has available retries, it
will be retried.",
- ti,
- ti=ti,
+ session.add(
+ Log(
+ event="stuck in queued",
Review Comment:
```suggestion
event="task stuck in queued",
```
##########
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##########
@@ -385,18 +385,25 @@ def attempt_task_runs(self):
)
self.pending_tasks.append(ecs_task)
else:
- self.send_message_to_task_logs(
- logging.ERROR,
- "ECS task %s has failed a maximum of %s times. Marking
as failed. Reasons: %s",
- task_key,
- attempt_number,
- ", ".join(failure_reasons),
- ti=task_key,
+ reasons_str = ", ".join(failure_reasons)
+ self.log_task_event(
+ record=Log(
+ event="ecs executor queue error",
+ task_instance=task_key,
+ extra=(
+ f"Task could not be queued after
{attempt_number} attempts. "
+ f"Marking as failed. Reasons: {reasons_str}"
+ ),
+ )
)
self.fail(task_key)
elif not run_task_response["tasks"]:
- self.send_message_to_task_logs(
- logging.ERROR, "ECS RunTask Response: %s",
run_task_response, ti=task_key
+ self.log_task_event(
+ record=Log(
+ event="ecs runtime error",
Review Comment:
```suggestion
event="ecs execution error",
```
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1817,22 +1830,34 @@ def _find_zombies(self) -> None:
if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s",
len(zombies), limit_dttm)
- for ti, file_loc, processor_subdir in zombies:
- zombie_message_details = self._generate_zombie_message_details(ti)
- request = TaskCallbackRequest(
- full_filepath=file_loc,
- processor_subdir=processor_subdir,
- simple_task_instance=SimpleTaskInstance.from_ti(ti),
- msg=str(zombie_message_details),
- )
- log_message = (
- f"Detected zombie job: {request} "
- "(See https://airflow.apache.org/docs/apache-airflow/"
- "stable/core-concepts/tasks.html#zombie-undead-tasks)"
- )
- self._task_context_logger.error(log_message, ti=ti)
- self.job.executor.send_callback(request)
- Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id":
ti.task_id})
+ with create_session() as session:
+ for ti, file_loc, processor_subdir in zombies:
+ zombie_message_details =
self._generate_zombie_message_details(ti)
+ request = TaskCallbackRequest(
+ full_filepath=file_loc,
+ processor_subdir=processor_subdir,
+ simple_task_instance=SimpleTaskInstance.from_ti(ti),
+ msg=str(zombie_message_details),
+ )
+ log_message = (
+ f"Detected zombie job: {request} "
+ "(See https://airflow.apache.org/docs/apache-airflow/"
+ "stable/core-concepts/tasks.html#zombie-undead-tasks)"
+ )
+ session.add(
+ Log(
+ event="stale heartbeat",
Review Comment:
```suggestion
event="task has not heartbeat for a while",
```
```suggestion
event="zombie task",
```
##########
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##########
@@ -385,18 +385,25 @@ def attempt_task_runs(self):
)
self.pending_tasks.append(ecs_task)
else:
- self.send_message_to_task_logs(
- logging.ERROR,
- "ECS task %s has failed a maximum of %s times. Marking
as failed. Reasons: %s",
- task_key,
- attempt_number,
- ", ".join(failure_reasons),
- ti=task_key,
+ reasons_str = ", ".join(failure_reasons)
+ self.log_task_event(
+ record=Log(
+ event="ecs executor queue error",
Review Comment:
```suggestion
event="ecs executor task error",
```
##########
airflow/providers/amazon/aws/executors/batch/batch_executor.py:
##########
@@ -292,12 +292,13 @@ def attempt_submit_jobs(self):
if failure_reason:
if attempt_number >=
int(self.__class__.MAX_SUBMIT_JOB_ATTEMPTS):
- self.send_message_to_task_logs(
- logging.ERROR,
- "This job has been unsuccessfully attempted too many
times (%s). Dropping the task. Reason: %s",
- attempt_number,
- failure_reason,
- ti=key,
+ self.log_task_event(
+ record=Log(
+ event="executor queue failure",
Review Comment:
```suggestion
event="executor job failure",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]