dstandish commented on code in PR #43647:
URL: https://github.com/apache/airflow/pull/43647#discussion_r1828209537
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1815,48 +1816,47 @@ def _handle_tasks_stuck_in_queued(self, session:
Session = NEW_SESSION) -> None:
)
).all()
- num_allowed_retries = conf.getint("core", "num_stuck_reschedules")
+ num_allowed_retries = conf.getint("scheduler",
"num_stuck_in_queued_retries")
for executor, stuck_tis in
self._executor_to_tis(tasks_stuck_in_queued).items():
- try:
- cleaned_up_task_instances =
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
- for ti in stuck_tis:
- if repr(ti) in cleaned_up_task_instances:
- self.log.warning(
- "Marking task instance %s stuck in queued as
failed. "
- "If the task instance has available retries, it
will be retried.",
- ti,
- )
- session.add(
- Log(
- event=RESCHEDULE_STUCK_IN_QUEUED_EVENT,
- task_instance=ti.key,
- extra=(
- f"Task was stuck in queued and will be
requeued, once it has hit {num_allowed_retries} attempts"
- "Task will be marked as failed. After
that, if the task instance has "
- "available retries, it will be retried."
- ),
- )
- )
-
- num_times_stuck =
self._get_num_times_stuck_in_queued(ti, session)
- if num_times_stuck < num_allowed_retries:
- executor.change_state(ti.key, State.SCHEDULED)
- else:
- session.add(
- Log(
- event="failing stuck in queued",
- task_instance=ti.key,
- extra=(
- "Task will be marked as failed. If the
task instance has "
- "available retries, it will be
retried."
- ),
- )
- )
- executor.fail(ti.key)
+ if not hasattr(executor, "cleanup_stuck_queued_tasks"):
+ continue
+ for ti in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
Review Comment:
the thing that still bothers me @dimberman is, it doesn't feel right that we
defer to the executor and only conditionally log if it "cleans up" the ti. we
have already observed that it is stuck in queued so why not log that?
i guess the problem is we are logging the wrong event. the event is not that
it is "stuck in queued" (which is an unconditional observation) but rather that
it was requeued. _that's_ the thing that conditionally happens.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]