This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 04d3c440e3c Fix retry callbacks not executing for externally killed
tasks (#56586)
04d3c440e3c is described below
commit 04d3c440e3c741b0f2d9ac4d34d2f09f8b41a38b
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Oct 13 17:31:53 2025 -0700
Fix retry callbacks not executing for externally killed tasks (#56586)
* Fix retry callbacks not executing for externally killed tasks
When tasks with remaining retries were killed externally the
`on_failure_callback` was incorrectly executed
instead of `on_retry_callback`.
The scheduler now correctly sets the callback type to `UP_FOR_RETRY` when
tasks are eligible to retry, ensuring proper callback and email routing.
For heartbeat timeouts, the task is loaded before evaluating retry
eligibility to access the task's retry configuration.
Fixes #56196
* fixup! Fix retry callbacks not executing for externally killed tasks
---
.../src/airflow/jobs/scheduler_job_runner.py | 5 +++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 38 ++++++++++++++++++++++
2 files changed, 43 insertions(+)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 36610142622..efadab3c1bb 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -933,6 +933,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
bundle_version=ti.dag_version.bundle_version,
ti=ti,
msg=msg,
+ task_callback_type=(
+ TaskInstanceState.UP_FOR_RETRY
+ if ti.is_eligible_to_retry()
+ else TaskInstanceState.FAILED
+ ),
context_from_server=TIRunContext(
dag_run=DRDataModel.model_validate(ti.dag_run,
from_attributes=True),
max_tries=ti.max_tries,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index feceac3de32..7c03dd4fa12 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -502,6 +502,7 @@ class TestSchedulerJob:
"finished with state failed, but the task instance's state
attribute is queued. "
"Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally",
context_from_server=mock.ANY,
+ task_callback_type=TaskInstanceState.FAILED,
)
scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback)
scheduler_job.executor.callback_sink.reset_mock()
@@ -6775,6 +6776,43 @@ class TestSchedulerJob:
assert callback_request.context_from_server.dag_run.logical_date ==
dag_run.logical_date
assert callback_request.context_from_server.max_tries == ti.max_tries
+ @pytest.mark.parametrize(
+ "retries,callback_kind,expected",
+ [
+ (1, "retry", TaskInstanceState.UP_FOR_RETRY),
+ (0, "failure", TaskInstanceState.FAILED),
+ ],
+ )
+ def test_external_kill_sets_callback_type_param(
+ self, dag_maker, session, retries, callback_kind, expected
+ ):
+ """External kill should mark callback type based on retry
eligibility."""
+ with dag_maker(dag_id=f"ext_kill_{callback_kind}",
fileloc="/test_path1/"):
+ if callback_kind == "retry":
+ EmptyOperator(task_id="t1", retries=retries,
on_retry_callback=lambda ctx: None)
+ else:
+ EmptyOperator(task_id="t1", retries=retries,
on_failure_callback=lambda ctx: None)
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+ ti = dr.get_task_instance(task_id="t1")
+
+ executor = MockExecutor(do_update=False)
+ scheduler_job = Job(executor=executor)
+ self.job_runner = SchedulerJobRunner(scheduler_job)
+
+ ti.state = State.QUEUED
+ session.merge(ti)
+ session.commit()
+
+ # Executor reports task finished (FAILED) while TI still QUEUED ->
external kill path
+ executor.event_buffer[ti.key] = State.FAILED, None
+
+ self.job_runner._process_executor_events(executor=executor,
session=session)
+
+ scheduler_job.executor.callback_sink.send.assert_called()
+ request = scheduler_job.executor.callback_sink.send.call_args[0][0]
+ assert isinstance(request, TaskCallbackRequest)
+ assert request.task_callback_type == expected
+
def test_scheduler_passes_context_from_server_on_task_failure(self,
dag_maker, session):
"""Test that scheduler passes context_from_server when handling task
failures."""
with dag_maker(dag_id="test_dag", session=session):