This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 04f19d19b07 Fix scheduler crashloop when last task instance predates 
Dag versioning (#68253)
04f19d19b07 is described below

commit 04f19d19b079aeefbf51cf31aa643c60b5951b34
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Jun 10 07:32:35 2026 +0100

    Fix scheduler crashloop when last task instance predates Dag versioning 
(#68253)
    
    * Fix scheduler crashloop when last task instance predates Dag versioning
    
    Historical task instances created before the dag_version table existed
    have dag_version_id=None. When the scheduler built a dag callback context
    with such a TI as last_ti, the non-null UUID datamodel validation raised
    a ValidationError and crashlooped the scheduler. Drop last_ti in that
    case so the callback still fires with a minimal context.
    
    closes: #68248
    
    * Remove issue number from test docstrings
    
    Test docstrings should describe behavior, not track tickets.
---
 airflow-core/src/airflow/models/dagrun.py     | 12 +++++++
 airflow-core/tests/unit/models/test_dagrun.py | 47 +++++++++++++++++++++++++++
 2 files changed, 59 insertions(+)

diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 8f5784267f5..364b6f0953b 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1376,6 +1376,18 @@ class DagRun(Base, LoggingMixin):
         execute: bool = False,
     ) -> DagCallbackRequest | None:
         """Create a callback request for the DAG, or execute the callbacks 
directly if instructed, and return None."""
+        # Historical task instances created before the dag_version table 
existed (migration
+        # 0047_3_0_0_add_dag_versioning) have dag_version_id=None. The 
TaskInstance datamodel used to
+        # build the callback context requires a non-null UUID, so passing such 
a TI as last_ti would
+        # raise a ValidationError and crash the scheduler. Drop last_ti in 
that case; the callback still
+        # fires with a minimal context (dag, run_id, reason).
+        if relevant_ti is not None and relevant_ti.dag_version_id is None:
+            self.log.warning(
+                "Task instance %s has no dag_version_id (pre-versioning 
record); "
+                "omitting last_ti from the dag callback context.",
+                relevant_ti,
+            )
+            relevant_ti = None
         if not execute:
             return DagCallbackRequest(
                 filepath=self.dag_model.relative_fileloc,
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 4a919a311a2..c33f3b007bf 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -3913,6 +3913,53 @@ class TestDagRunHandleDagCallback:
         assert context_received["ti"].dag_id == "test_dag"
         assert context_received["ti"].run_id == dr.run_id
 
+    def test_produce_dag_callback_drops_last_ti_without_dag_version(self, 
dag_maker, session):
+        """A historical TI with dag_version_id=None must not crash callback 
construction."""
+        with dag_maker("test_dag", session=session) as dag:
+            BashOperator(task_id="test_task", bash_command="echo 1")
+
+        dr = dag_maker.create_dagrun()
+        dr.dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
+        ti = dr.get_task_instance("test_task")
+        # Simulate a task instance created before the dag_version table 
existed.
+        ti.dag_version_id = None
+        session.flush()
+
+        callback = dr.produce_dag_callback(dag=dag, success=False, 
relevant_ti=ti, reason="task_failure")
+
+        assert callback is not None
+        # last_ti is dropped so the non-null UUID datamodel validation never 
fires.
+        assert callback.context_from_server is not None
+        assert callback.context_from_server.last_ti is None
+
+    def test_execute_dag_callbacks_without_dag_version(self, dag_maker, 
session):
+        """The execute=True path must also tolerate a TI with 
dag_version_id=None."""
+        context_received = None
+
+        def on_failure(context):
+            nonlocal context_received
+            context_received = context
+
+        with dag_maker("test_dag", session=session, 
on_failure_callback=on_failure) as dag:
+            BashOperator(task_id="test_task", bash_command="echo 1")
+
+        dr = dag_maker.create_dagrun()
+        dr.dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
+        ti = dr.get_task_instance("test_task")
+        ti.dag_version_id = None
+        session.flush()
+
+        dag.on_failure_callback = on_failure
+        dag.has_on_failure_callback = True
+
+        dr.produce_dag_callback(dag=dag, success=False, relevant_ti=ti, 
reason="task_failure", execute=True)
+
+        # Callback still fires with the minimal fallback context (no last_ti 
template vars).
+        assert context_received is not None
+        assert context_received["reason"] == "task_failure"
+        assert "ti" not in context_received
+        assert context_received["run_id"] == dr.run_id
+
 
 class TestDagRunTracing:
     """Tests for DagRun OpenTelemetry span behavior."""

Reply via email to