BALOGUN-DAVID commented on code in PR #65970:
URL: https://github.com/apache/airflow/pull/65970#discussion_r3229835954
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2552,92 +2551,117 @@ def _maybe_requeue_stuck_ti(self, *, ti, session,
executor):
Otherwise, fail it.
"""
- num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
+ current_ti = self._get_ti_still_stuck_in_queued(ti=ti, session=session)
+ if current_ti is None:
+ self.log.debug(
+ "Task changed state before queued-timeout recovery; skipping.
task_instance=%s",
+ ti,
+ )
+ return
+
+ num_times_stuck = self._get_num_times_stuck_in_queued(current_ti,
session)
if num_times_stuck < self._num_stuck_queued_retries:
- self.log.info("Task stuck in queued; will try to requeue.
task_instance=%s", ti)
+ self.log.info("Task stuck in queued; will try to requeue.
task_instance=%s", current_ti)
+ executor.revoke_task(ti=current_ti)
session.add(
Log(
event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
- task_instance=ti.key,
+ task_instance=current_ti.key,
extra=(
f"Task was in queued state for longer than
{self._task_queued_timeout} "
"seconds; task state will be set back to scheduled."
),
)
)
- self._reschedule_stuck_task(ti, session=session)
+ self._reschedule_stuck_task(current_ti, session=session)
else:
self.log.info(
"Task requeue attempts exceeded max; marking failed.
task_instance=%s",
- ti,
+ current_ti,
)
msg = f"Task was requeued more than
{self._num_stuck_queued_retries} times and will be failed."
session.add(
Log(
event="stuck in queued tries exceeded",
- task_instance=ti.key,
+ task_instance=current_ti.key,
extra=msg,
)
)
try:
- dag =
self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session)
- task = dag.get_task(ti.task_id)
+ dag =
self.scheduler_dag_bag.get_dag_for_run(dag_run=current_ti.dag_run,
session=session)
+ task = dag.get_task(current_ti.task_id)
except Exception:
self.log.warning(
"The DAG or task could not be found. If a failure callback
exists, it will not be run.",
exc_info=True,
)
else:
if task.has_on_failure_callback:
- if inspect(ti).detached:
- ti = session.merge(ti)
+ if inspect(current_ti).detached:
+ current_ti = session.merge(current_ti)
# Safely extract bundle info with fallback for legacy tasks
# (dag_version may be None after Airflow 2 → 3 migration).
_stuck_bundle_name = (
- ti.dag_version.bundle_name if ti.dag_version else
ti.dag_model.bundle_name
+ current_ti.dag_version.bundle_name
+ if current_ti.dag_version
+ else current_ti.dag_model.bundle_name
)
_stuck_bundle_version = (
- ti.dag_version.bundle_version if ti.dag_version else
ti.dag_run.bundle_version
+ current_ti.dag_version.bundle_version
+ if current_ti.dag_version
+ else current_ti.dag_run.bundle_version
)
# Backfill dag_version_id for legacy tasks (Pydantic
requires uuid.UUID).
# Note: we cannot use `continue` here because this method
is not
# inside a loop. If backfilling fails we simply skip the
callback.
- if _ensure_ti_has_dag_version_id(ti, session, self.log):
+ if _ensure_ti_has_dag_version_id(current_ti, session,
self.log):
request = TaskCallbackRequest(
- filepath=ti.dag_model.relative_fileloc or "",
+ filepath=current_ti.dag_model.relative_fileloc or
"",
bundle_name=_stuck_bundle_name,
bundle_version=_stuck_bundle_version,
- ti=ti,
+ ti=current_ti,
msg=msg,
context_from_server=TIRunContext(
- dag_run=ti.dag_run,
- max_tries=ti.max_tries,
+ dag_run=current_ti.dag_run,
+ max_tries=current_ti.max_tries,
variables=[],
connections=[],
xcom_keys_to_clear=[],
),
)
executor.send_callback(request)
finally:
- ti.set_state(TaskInstanceState.FAILED, session=session)
- executor.fail(ti.key)
+ current_ti.set_state(TaskInstanceState.FAILED, session=session)
+ executor.fail(current_ti.key)
Review Comment:
You’re not missing anything, good catch ser.
I’ll update the fail path to also call `executor.revoke_task(ti=current_ti)`
after the stale-state check passes, before marking the TI failed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]