This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new c64bb863446 Prevent stale zombie callbacks from failing newer task
attempts (#63726)
c64bb863446 is described below
commit c64bb863446e12c4b8fb91c5b1d8ff8dca73e334
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Mar 16 22:03:39 2026 +0100
Prevent stale zombie callbacks from failing newer task attempts (#63726)
A zombie failure callback from try 1 can be executed after try 2 has
already succeeded. In that case, running the stale callback incorrectly
marks the current task instance as failed in the database.
Skip failure callbacks when the callback request try_number no longer
matches the current task instance try_number, and add a regression test
covering the stale-callback case.
---
airflow/dag_processing/processor.py | 9 +++++++
tests/dag_processing/test_processor.py | 44 ++++++++++++++++++++++++++++++++++
2 files changed, 53 insertions(+)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 86db0b5b881..afd23a365dd 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -846,6 +846,15 @@ class DagFileProcessor(LoggingMixin):
if not ti:
return
+ if request.is_failure_callback and simple_ti.try_number !=
ti.try_number:
+ cls.logger().warning(
+ "Skipping stale failure callback for %s: callback
try_number=%s, current try_number=%s",
+ ti,
+ simple_ti.try_number,
+ ti.try_number,
+ )
+ return
+
task: Operator | None = None
if dagbag and simple_ti.dag_id in dagbag.dags:
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index e006f36478f..5fd1482d6ee 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -533,6 +533,50 @@ class TestDagFileProcessor:
error="Message", test_mode=conf.getboolean("core",
"unit_test_mode"), session=session
)
+ @pytest.mark.skip_if_database_isolation_mode # Test is broken in db
isolation mode
+ @patch.object(DagFileProcessor, "logger")
+ @patch.object(TaskInstance, "handle_failure")
+ def test_execute_on_failure_callbacks_skips_stale_try_number(self,
mock_ti_handle_failure, mock_logger):
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=True,
read_dags_from_db=False)
+ dag_file_processor = DagFileProcessor(
+ dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
+ )
+ with create_session() as session:
+ session.query(TaskInstance).delete()
+ dag = dagbag.get_dag("example_branch_operator")
+ dagrun = dag.create_dagrun(
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ run_type=DagRunType.SCHEDULED,
+ data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
+ session=session,
+ )
+ task = dag.get_task(task_id="run_this_first")
+ ti = TaskInstance(task, run_id=dagrun.run_id, state=State.RUNNING)
+ session.add(ti)
+ session.flush()
+
+ simple_ti = SimpleTaskInstance.from_ti(ti)
+ ti.try_number += 1
+ session.flush()
+
+ requests = [
+ TaskCallbackRequest(
+ full_filepath="A",
+ simple_task_instance=simple_ti,
+ msg="Message",
+ )
+ ]
+ dag_file_processor.execute_callbacks(dagbag, requests,
dag_file_processor.UNIT_TEST_MODE, session)
+
+ mock_ti_handle_failure.assert_not_called()
+ mock_logger.return_value.warning.assert_called_once_with(
+ "Skipping stale failure callback for %s: callback try_number=%s,
current try_number=%s",
+ ti,
+ simple_ti.try_number,
+ ti.try_number,
+ )
+
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db
isolation mode
@pytest.mark.parametrize(
["has_serialized_dag"],