This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-11-test by this push:
     new d5fbdd6caff Fix stuck queued tasks by calling executor fail method and 
invoking failure callbacks (#53038)
d5fbdd6caff is described below

commit d5fbdd6caff1c7cc9984abd3ea8c20a099494578
Author: Karen Braganza <[email protected]>
AuthorDate: Fri Feb 13 10:09:11 2026 -0500

    Fix stuck queued tasks by calling executor fail method and invoking failure 
callbacks (#53038)
    
    This commit addresses the handling of tasks that remain stuck in the queued 
state
    beyond the configured retry threshold. Previously, these tasks were marked 
as failed
    in the database but the executor was not properly notified, leading to 
inconsistent
    state between Airflow and the executor.
    
    Changes made:
    - Modified _maybe_requeue_stuck_ti() to accept executor parameter and call 
executor.fail()
      when tasks exceed requeue attempts, ensuring the executor is notified of 
task failures
    - Added logic to retrieve the DAG and task object to check for 
on_failure_callback
    - When a failure callback exists, create a TaskCallbackRequest and send it 
via executor
      to ensure failure callbacks are invoked for stuck queued tasks
    - Updated tests to verify that executor.fail() is called and callbacks are 
sent appropriately
    - Added test fixtures with mock_failure_callback to validate callback 
invocation
    - Changed pyproject.toml Python version requirement (unrelated)
    
    Why this matters:
    Tasks stuck in queued state can now be properly cleaned up at both the 
Airflow scheduler
    and executor levels, preventing resource leaks and ensuring failure 
callbacks are executed
    for proper error handling and alerting.
    
    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
---
 airflow/jobs/scheduler_job_runner.py | 36 ++++++++++++++++++++++++++++--------
 tests/jobs/test_scheduler_job.py     | 18 ++++++++++++++----
 2 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 26bd77e1023..2725dd71d9f 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -33,7 +33,7 @@ from pathlib import Path
 from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
 
 from deprecated import deprecated
-from sqlalchemy import and_, delete, desc, func, not_, or_, select, text, 
update
+from sqlalchemy import and_, delete, desc, func, inspect, not_, or_, select, 
text, update
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
 from sqlalchemy.sql import expression
@@ -1810,10 +1810,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             try:
                 for ti in stuck_tis:
                     executor.revoke_task(ti=ti)
-                    self._maybe_requeue_stuck_ti(
-                        ti=ti,
-                        session=session,
-                    )
+                    self._maybe_requeue_stuck_ti(ti=ti, session=session, 
executor=executor)
             except NotImplementedError:
                 # this block only gets entered if the executor has not 
implemented `revoke_task`.
                 # in which case, we try the fallback logic
@@ -1833,7 +1830,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
         )
 
-    def _maybe_requeue_stuck_ti(self, *, ti, session):
+    def _maybe_requeue_stuck_ti(self, *, ti, session, executor):
         """
         Requeue task if it has not been attempted too many times.
 
@@ -1858,14 +1855,37 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 "Task requeue attempts exceeded max; marking failed. 
task_instance=%s",
                 ti,
             )
+            msg = f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed."
             session.add(
                 Log(
                     event="stuck in queued tries exceeded",
                     task_instance=ti.key,
-                    extra=f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed.",
+                    extra=msg,
                 )
             )
-            ti.set_state(TaskInstanceState.FAILED, session=session)
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id)
+                task = dag.get_task(ti.task_id)
+            except Exception:
+                self.log.warning(
+                    "The DAG or task could not be found. If a failure callback 
exists, it will not be run.",
+                    exc_info=True,
+                )
+            else:
+                ti.task = task
+                if task.on_failure_callback:
+                    if inspect(ti).detached:
+                        ti = session.merge(ti)
+                    request = TaskCallbackRequest(
+                        full_filepath=ti.dag_model.fileloc,
+                        simple_task_instance=SimpleTaskInstance.from_ti(ti),
+                        msg=msg,
+                        processor_subdir=ti.dag_model.processor_subdir,
+                    )
+                    executor.send_callback(request)
+            finally:
+                ti.set_state(TaskInstanceState.FAILED, session=session)
+                executor.fail(ti.key)
 
     @deprecated(
         reason="This is backcompat layer for older executor interface. Should 
be removed in 3.0",
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f8a879145df..abf394743ee 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2243,11 +2243,15 @@ class TestSchedulerJob:
             
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1])
         
mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
 
+    @staticmethod
+    def mock_failure_callback(context):
+        pass
+
     @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
     def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, 
session, mock_executors):
         """Verify that tasks stuck in queued will be rescheduled up to N 
times."""
         with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
-            EmptyOperator(task_id="op1")
+            EmptyOperator(task_id="op1", 
on_failure_callback=TestSchedulerJob.mock_failure_callback)
             EmptyOperator(task_id="op2", executor="default_exec")
 
         def _queue_tasks(tis):
@@ -2310,16 +2314,19 @@ class TestSchedulerJob:
             "stuck in queued tries exceeded",
         ]
 
-        mock_executors[0].fail.assert_not_called()  # just demoing that we 
don't fail with executor method
+        mock_executors[
+            0
+        ].send_callback.assert_called_once()  # this should only be called for 
the task that has a callback
         states = [x.state for x in dr.get_task_instances(session=session)]
         assert states == ["failed", "failed"]
+        mock_executors[0].fail.assert_called()
 
     @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
     def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, 
session, mock_executors):
         """Reschedule sensors go in and out of running repeatedly using the 
same try_number
         Make sure that they get three attempts per reschedule, not 3 attempts 
per try_number"""
         with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
-            EmptyOperator(task_id="op1")
+            EmptyOperator(task_id="op1", 
on_failure_callback=TestSchedulerJob.mock_failure_callback)
             EmptyOperator(task_id="op2", executor="default_exec")
 
         def _queue_tasks(tis):
@@ -2409,9 +2416,12 @@ class TestSchedulerJob:
             "stuck in queued tries exceeded",
         ]
 
-        mock_executors[0].fail.assert_not_called()  # just demoing that we 
don't fail with executor method
+        mock_executors[
+            0
+        ].send_callback.assert_called_once()  # this should only be called for 
the task that has a callback
         states = [x.state for x in dr.get_task_instances(session=session)]
         assert states == ["failed", "failed"]
+        mock_executors[0].fail.assert_called()
 
     def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
         """Test that if executor no implement revoke_task then we don't blow 
up."""

Reply via email to