This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 04d3c440e3c Fix retry callbacks not executing for externally killed 
tasks (#56586)
04d3c440e3c is described below

commit 04d3c440e3c741b0f2d9ac4d34d2f09f8b41a38b
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Oct 13 17:31:53 2025 -0700

    Fix retry callbacks not executing for externally killed tasks (#56586)
    
    * Fix retry callbacks not executing for externally killed tasks
    
    When tasks with remaining retries were killed externally the
    `on_failure_callback` was incorrectly executed
    instead of `on_retry_callback`.
    
    The scheduler now correctly sets the callback type to `UP_FOR_RETRY` when
    tasks are eligible to retry, ensuring proper callback and email routing.
    For heartbeat timeouts, the task is loaded before evaluating retry
    eligibility to access the task's retry configuration.
    
    Fixes #56196
    
    * fixup! Fix retry callbacks not executing for externally killed tasks
---
 .../src/airflow/jobs/scheduler_job_runner.py       |  5 +++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 38 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 36610142622..efadab3c1bb 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -933,6 +933,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         bundle_version=ti.dag_version.bundle_version,
                         ti=ti,
                         msg=msg,
+                        task_callback_type=(
+                            TaskInstanceState.UP_FOR_RETRY
+                            if ti.is_eligible_to_retry()
+                            else TaskInstanceState.FAILED
+                        ),
                         context_from_server=TIRunContext(
                             dag_run=DRDataModel.model_validate(ti.dag_run, 
from_attributes=True),
                             max_tries=ti.max_tries,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index feceac3de32..7c03dd4fa12 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -502,6 +502,7 @@ class TestSchedulerJob:
             "finished with state failed, but the task instance's state 
attribute is queued. "
             "Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally";,
             context_from_server=mock.ANY,
+            task_callback_type=TaskInstanceState.FAILED,
         )
         
scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback)
         scheduler_job.executor.callback_sink.reset_mock()
@@ -6775,6 +6776,43 @@ class TestSchedulerJob:
         assert callback_request.context_from_server.dag_run.logical_date == 
dag_run.logical_date
         assert callback_request.context_from_server.max_tries == ti.max_tries
 
+    @pytest.mark.parametrize(
+        "retries,callback_kind,expected",
+        [
+            (1, "retry", TaskInstanceState.UP_FOR_RETRY),
+            (0, "failure", TaskInstanceState.FAILED),
+        ],
+    )
+    def test_external_kill_sets_callback_type_param(
+        self, dag_maker, session, retries, callback_kind, expected
+    ):
+        """External kill should mark callback type based on retry 
eligibility."""
+        with dag_maker(dag_id=f"ext_kill_{callback_kind}", 
fileloc="/test_path1/"):
+            if callback_kind == "retry":
+                EmptyOperator(task_id="t1", retries=retries, 
on_retry_callback=lambda ctx: None)
+            else:
+                EmptyOperator(task_id="t1", retries=retries, 
on_failure_callback=lambda ctx: None)
+        dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+        ti = dr.get_task_instance(task_id="t1")
+
+        executor = MockExecutor(do_update=False)
+        scheduler_job = Job(executor=executor)
+        self.job_runner = SchedulerJobRunner(scheduler_job)
+
+        ti.state = State.QUEUED
+        session.merge(ti)
+        session.commit()
+
+        # Executor reports task finished (FAILED) while TI still QUEUED -> 
external kill path
+        executor.event_buffer[ti.key] = State.FAILED, None
+
+        self.job_runner._process_executor_events(executor=executor, 
session=session)
+
+        scheduler_job.executor.callback_sink.send.assert_called()
+        request = scheduler_job.executor.callback_sink.send.call_args[0][0]
+        assert isinstance(request, TaskCallbackRequest)
+        assert request.task_callback_type == expected
+
     def test_scheduler_passes_context_from_server_on_task_failure(self, 
dag_maker, session):
         """Test that scheduler passes context_from_server when handling task 
failures."""
         with dag_maker(dag_id="test_dag", session=session):

Reply via email to