This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new d5fbdd6caff Fix stuck queued tasks by calling executor fail method and
invoking failure callbacks (#53038)
d5fbdd6caff is described below
commit d5fbdd6caff1c7cc9984abd3ea8c20a099494578
Author: Karen Braganza <[email protected]>
AuthorDate: Fri Feb 13 10:09:11 2026 -0500
Fix stuck queued tasks by calling executor fail method and invoking failure
callbacks (#53038)
This commit addresses the handling of tasks that remain stuck in the queued
state
beyond the configured retry threshold. Previously, these tasks were marked
as failed
in the database but the executor was not properly notified, leading to
inconsistent
state between Airflow and the executor.
Changes made:
- Modified _maybe_requeue_stuck_ti() to accept executor parameter and call
executor.fail()
when tasks exceed requeue attempts, ensuring the executor is notified of
task failures
- Added logic to retrieve the DAG and task object to check for
on_failure_callback
- When a failure callback exists, create a TaskCallbackRequest and send it
via executor
to ensure failure callbacks are invoked for stuck queued tasks
- Updated tests to verify that executor.fail() is called and callbacks are
sent appropriately
- Added test fixtures with mock_failure_callback to validate callback
invocation
- Changed pyproject.toml Python version requirement (unrelated)
Why this matters:
Tasks stuck in queued state can now be properly cleaned up at both the
Airflow scheduler
and executor levels, preventing resource leaks and ensuring failure
callbacks are executed
for proper error handling and alerting.
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
---
airflow/jobs/scheduler_job_runner.py | 36 ++++++++++++++++++++++++++++--------
tests/jobs/test_scheduler_job.py | 18 ++++++++++++++----
2 files changed, 42 insertions(+), 12 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 26bd77e1023..2725dd71d9f 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -33,7 +33,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
from deprecated import deprecated
-from sqlalchemy import and_, delete, desc, func, not_, or_, select, text,
update
+from sqlalchemy import and_, delete, desc, func, inspect, not_, or_, select,
text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
@@ -1810,10 +1810,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
try:
for ti in stuck_tis:
executor.revoke_task(ti=ti)
- self._maybe_requeue_stuck_ti(
- ti=ti,
- session=session,
- )
+ self._maybe_requeue_stuck_ti(ti=ti, session=session,
executor=executor)
except NotImplementedError:
# this block only gets entered if the executor has not
implemented `revoke_task`.
# in which case, we try the fallback logic
@@ -1833,7 +1830,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
)
- def _maybe_requeue_stuck_ti(self, *, ti, session):
+ def _maybe_requeue_stuck_ti(self, *, ti, session, executor):
"""
Requeue task if it has not been attempted too many times.
@@ -1858,14 +1855,37 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"Task requeue attempts exceeded max; marking failed.
task_instance=%s",
ti,
)
+ msg = f"Task was requeued more than
{self._num_stuck_queued_retries} times and will be failed."
session.add(
Log(
event="stuck in queued tries exceeded",
task_instance=ti.key,
- extra=f"Task was requeued more than
{self._num_stuck_queued_retries} times and will be failed.",
+ extra=msg,
)
)
- ti.set_state(TaskInstanceState.FAILED, session=session)
+ try:
+ dag = self.dagbag.get_dag(ti.dag_id)
+ task = dag.get_task(ti.task_id)
+ except Exception:
+ self.log.warning(
+ "The DAG or task could not be found. If a failure callback
exists, it will not be run.",
+ exc_info=True,
+ )
+ else:
+ ti.task = task
+ if task.on_failure_callback:
+ if inspect(ti).detached:
+ ti = session.merge(ti)
+ request = TaskCallbackRequest(
+ full_filepath=ti.dag_model.fileloc,
+ simple_task_instance=SimpleTaskInstance.from_ti(ti),
+ msg=msg,
+ processor_subdir=ti.dag_model.processor_subdir,
+ )
+ executor.send_callback(request)
+ finally:
+ ti.set_state(TaskInstanceState.FAILED, session=session)
+ executor.fail(ti.key)
@deprecated(
reason="This is backcompat layer for older executor interface. Should
be removed in 3.0",
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f8a879145df..abf394743ee 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2243,11 +2243,15 @@ class TestSchedulerJob:
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1])
mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
+ @staticmethod
+ def mock_failure_callback(context):
+ pass
+
@conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker,
session, mock_executors):
"""Verify that tasks stuck in queued will be rescheduled up to N
times."""
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
- EmptyOperator(task_id="op1")
+ EmptyOperator(task_id="op1",
on_failure_callback=TestSchedulerJob.mock_failure_callback)
EmptyOperator(task_id="op2", executor="default_exec")
def _queue_tasks(tis):
@@ -2310,16 +2314,19 @@ class TestSchedulerJob:
"stuck in queued tries exceeded",
]
- mock_executors[0].fail.assert_not_called() # just demoing that we
don't fail with executor method
+ mock_executors[
+ 0
+ ].send_callback.assert_called_once() # this should only be called for
the task that has a callback
states = [x.state for x in dr.get_task_instances(session=session)]
assert states == ["failed", "failed"]
+ mock_executors[0].fail.assert_called()
@conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker,
session, mock_executors):
"""Reschedule sensors go in and out of running repeatedly using the
same try_number
Make sure that they get three attempts per reschedule, not 3 attempts
per try_number"""
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
- EmptyOperator(task_id="op1")
+ EmptyOperator(task_id="op1",
on_failure_callback=TestSchedulerJob.mock_failure_callback)
EmptyOperator(task_id="op2", executor="default_exec")
def _queue_tasks(tis):
@@ -2409,9 +2416,12 @@ class TestSchedulerJob:
"stuck in queued tries exceeded",
]
- mock_executors[0].fail.assert_not_called() # just demoing that we
don't fail with executor method
+ mock_executors[
+ 0
+ ].send_callback.assert_called_once() # this should only be called for
the task that has a callback
states = [x.state for x in dr.get_task_instances(session=session)]
assert states == ["failed", "failed"]
+ mock_executors[0].fail.assert_called()
def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
"""Test that if executor no implement revoke_task then we don't blow
up."""