This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-11-test by this push:
     new c64bb863446 Prevent stale zombie callbacks from failing newer task 
attempts (#63726)
c64bb863446 is described below

commit c64bb863446e12c4b8fb91c5b1d8ff8dca73e334
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Mar 16 22:03:39 2026 +0100

    Prevent stale zombie callbacks from failing newer task attempts (#63726)
    
    A zombie failure callback from try 1 can be executed after try 2 has
    already succeeded. In that case, running the stale callback incorrectly
    marks the current task instance as failed in the database.
    
    Skip failure callbacks when the callback request try_number no longer
    matches the current task instance try_number, and add a regression test
    covering the stale-callback case.
---
 airflow/dag_processing/processor.py    |  9 +++++++
 tests/dag_processing/test_processor.py | 44 ++++++++++++++++++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 86db0b5b881..afd23a365dd 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -846,6 +846,15 @@ class DagFileProcessor(LoggingMixin):
         if not ti:
             return
 
+        if request.is_failure_callback and simple_ti.try_number != 
ti.try_number:
+            cls.logger().warning(
+                "Skipping stale failure callback for %s: callback 
try_number=%s, current try_number=%s",
+                ti,
+                simple_ti.try_number,
+                ti.try_number,
+            )
+            return
+
         task: Operator | None = None
 
         if dagbag and simple_ti.dag_id in dagbag.dags:
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index e006f36478f..5fd1482d6ee 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -533,6 +533,50 @@ class TestDagFileProcessor:
             error="Message", test_mode=conf.getboolean("core", 
"unit_test_mode"), session=session
         )
 
+    @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db 
isolation mode
+    @patch.object(DagFileProcessor, "logger")
+    @patch.object(TaskInstance, "handle_failure")
+    def test_execute_on_failure_callbacks_skips_stale_try_number(self, 
mock_ti_handle_failure, mock_logger):
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=True, 
read_dags_from_db=False)
+        dag_file_processor = DagFileProcessor(
+            dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
+        )
+        with create_session() as session:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag("example_branch_operator")
+            dagrun = dag.create_dagrun(
+                state=State.RUNNING,
+                execution_date=DEFAULT_DATE,
+                run_type=DagRunType.SCHEDULED,
+                data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
+                session=session,
+            )
+            task = dag.get_task(task_id="run_this_first")
+            ti = TaskInstance(task, run_id=dagrun.run_id, state=State.RUNNING)
+            session.add(ti)
+            session.flush()
+
+            simple_ti = SimpleTaskInstance.from_ti(ti)
+            ti.try_number += 1
+            session.flush()
+
+            requests = [
+                TaskCallbackRequest(
+                    full_filepath="A",
+                    simple_task_instance=simple_ti,
+                    msg="Message",
+                )
+            ]
+            dag_file_processor.execute_callbacks(dagbag, requests, 
dag_file_processor.UNIT_TEST_MODE, session)
+
+        mock_ti_handle_failure.assert_not_called()
+        mock_logger.return_value.warning.assert_called_once_with(
+            "Skipping stale failure callback for %s: callback try_number=%s, 
current try_number=%s",
+            ti,
+            simple_ti.try_number,
+            ti.try_number,
+        )
+
     @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db 
isolation mode
     @pytest.mark.parametrize(
         ["has_serialized_dag"],

Reply via email to