This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 60ee14c2c3d [v3-0-test] Allow failure callbacks for stuck in queued 
TIs that fail (#53435) (#54401)
60ee14c2c3d is described below

commit 60ee14c2c3d1b433517d8dab737c4ecb701e0bfe
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Aug 13 13:39:30 2025 +0100

    [v3-0-test] Allow failure callbacks for stuck in queued TIs that fail 
(#53435) (#54401)
    
    In issue #51301, it was reported that failure callbacks do not run for task 
instances that get stuck in queued and fail in Airflow 2.10.5. This is 
happening due to the changes introduced in PR #43520 . In this PR, logic was 
introduced to requeue tasks that get stuck in queued (up to two times by 
default) before failing them.
    
    Previously, the executor's fail method was called when the task needed to 
be failed after max requeue attempts. This was replaced by the task instance's 
set_state method in the PR ti.set_state(TaskInstanceState.FAILED, 
session=session). Without the executor's fail method being called, failure 
callbacks will not be executed for such task instances. Therefore, I changed 
the code to call the executor's fail method instead in Airflow 3.
    (cherry picked from commit 6da77b1fdfc0b51762b47638489e752384911758)
    
    Co-authored-by: Karen Braganza <[email protected]>
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 38 ++++++++++++++++++++--
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 18 +++++++---
 2 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 0a47913ad50..2ad76af29c8 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2014,6 +2014,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     self._maybe_requeue_stuck_ti(
                         ti=ti,
                         session=session,
+                        executor=executor,
                     )
                     session.commit()
             except NotImplementedError:
@@ -2029,7 +2030,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
         )
 
-    def _maybe_requeue_stuck_ti(self, *, ti, session):
+    def _maybe_requeue_stuck_ti(self, *, ti, session, executor):
         """
         Requeue task if it has not been attempted too many times.
 
@@ -2054,14 +2055,45 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 "Task requeue attempts exceeded max; marking failed. 
task_instance=%s",
                 ti,
             )
+            msg = f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed."
             session.add(
                 Log(
                     event="stuck in queued tries exceeded",
                     task_instance=ti.key,
-                    extra=f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed.",
+                    extra=msg,
                 )
             )
-            ti.set_state(TaskInstanceState.FAILED, session=session)
+
+            try:
+                dag = self.scheduler_dag_bag.get_dag(dag_run=ti.dag_run, 
session=session)
+                task = dag.get_task(ti.task_id)
+            except Exception:
+                self.log.warning(
+                    "The DAG or task could not be found. If a failure callback 
exists, it will not be run.",
+                    exc_info=True,
+                )
+            else:
+                if task.on_failure_callback:
+                    if inspect(ti).detached:
+                        ti = session.merge(ti)
+                    request = TaskCallbackRequest(
+                        filepath=ti.dag_model.relative_fileloc,
+                        bundle_name=ti.dag_version.bundle_name,
+                        bundle_version=ti.dag_version.bundle_version,
+                        ti=ti,
+                        msg=msg,
+                        context_from_server=TIRunContext(
+                            dag_run=ti.dag_run,
+                            max_tries=ti.max_tries,
+                            variables=[],
+                            connections=[],
+                            xcom_keys_to_clear=[],
+                        ),
+                    )
+                    executor.send_callback(request)
+            finally:
+                ti.set_state(TaskInstanceState.FAILED, session=session)
+                executor.fail(ti.key)
 
     def _reschedule_stuck_task(self, ti: TaskInstance, session: Session):
         session.execute(
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index c451f9d0a91..058504880af 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2003,11 +2003,15 @@ class TestSchedulerJob:
         # Second executor called for ti3
         
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])
 
+    @staticmethod
+    def mock_failure_callback(context):
+        pass
+
     @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
     def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, 
session, mock_executors):
         """Verify that tasks stuck in queued will be rescheduled up to N 
times."""
         with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
-            EmptyOperator(task_id="op1")
+            EmptyOperator(task_id="op1", 
on_failure_callback=TestSchedulerJob.mock_failure_callback)
             EmptyOperator(task_id="op2", executor="default_exec")
 
         def _queue_tasks(tis):
@@ -2073,16 +2077,19 @@ class TestSchedulerJob:
             "stuck in queued tries exceeded",
         ]
 
-        mock_executors[0].fail.assert_not_called()  # just demoing that we 
don't fail with executor method
+        mock_executors[
+            0
+        ].send_callback.assert_called_once()  # this should only be called for 
the task that has a callback
         states = [x.state for x in dr.get_task_instances(session=session)]
         assert states == ["failed", "failed"]
+        mock_executors[0].fail.assert_called()
 
     @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
     def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, 
session, mock_executors):
         """Reschedule sensors go in and out of running repeatedly using the 
same try_number
         Make sure that they get three attempts per reschedule, not 3 attempts 
per try_number"""
         with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
-            EmptyOperator(task_id="op1")
+            EmptyOperator(task_id="op1", 
on_failure_callback=TestSchedulerJob.mock_failure_callback)
             EmptyOperator(task_id="op2", executor="default_exec")
 
         def _queue_tasks(tis):
@@ -2172,9 +2179,12 @@ class TestSchedulerJob:
             "stuck in queued tries exceeded",
         ]
 
-        mock_executors[0].fail.assert_not_called()  # just demoing that we 
don't fail with executor method
+        mock_executors[
+            0
+        ].send_callback.assert_called_once()  # this should only be called for 
the task that has a callback
         states = [x.state for x in dr.get_task_instances(session=session)]
         assert states == ["failed", "failed"]
+        mock_executors[0].fail.assert_called()
 
     def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
         """Test that if executor no implement revoke_task then we don't blow 
up."""

Reply via email to