This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 04f19d19b07 Fix scheduler crashloop when last task instance predates
Dag versioning (#68253)
04f19d19b07 is described below
commit 04f19d19b079aeefbf51cf31aa643c60b5951b34
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Jun 10 07:32:35 2026 +0100
Fix scheduler crashloop when last task instance predates Dag versioning
(#68253)
* Fix scheduler crashloop when last task instance predates Dag versioning
Historical task instances created before the dag_version table existed
have dag_version_id=None. When the scheduler built a dag callback context
with such a TI as last_ti, the non-null UUID datamodel validation raised
a ValidationError and crashlooped the scheduler. Drop last_ti in that
case so the callback still fires with a minimal context.
closes: #68248
* Remove issue number from test docstrings
Test docstrings should describe behavior, not track tickets.
---
airflow-core/src/airflow/models/dagrun.py | 12 +++++++
airflow-core/tests/unit/models/test_dagrun.py | 47 +++++++++++++++++++++++++++
2 files changed, 59 insertions(+)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 8f5784267f5..364b6f0953b 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1376,6 +1376,18 @@ class DagRun(Base, LoggingMixin):
execute: bool = False,
) -> DagCallbackRequest | None:
"""Create a callback request for the DAG, or execute the callbacks
directly if instructed, and return None."""
+ # Historical task instances created before the dag_version table
existed (migration
+ # 0047_3_0_0_add_dag_versioning) have dag_version_id=None. The
TaskInstance datamodel used to
+ # build the callback context requires a non-null UUID, so passing such
a TI as last_ti would
+ # raise a ValidationError and crash the scheduler. Drop last_ti in
that case; the callback still
+ # fires with a minimal context (dag, run_id, reason).
+ if relevant_ti is not None and relevant_ti.dag_version_id is None:
+ self.log.warning(
+ "Task instance %s has no dag_version_id (pre-versioning
record); "
+ "omitting last_ti from the dag callback context.",
+ relevant_ti,
+ )
+ relevant_ti = None
if not execute:
return DagCallbackRequest(
filepath=self.dag_model.relative_fileloc,
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 4a919a311a2..c33f3b007bf 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -3913,6 +3913,53 @@ class TestDagRunHandleDagCallback:
assert context_received["ti"].dag_id == "test_dag"
assert context_received["ti"].run_id == dr.run_id
+ def test_produce_dag_callback_drops_last_ti_without_dag_version(self,
dag_maker, session):
+ """A historical TI with dag_version_id=None must not crash callback
construction."""
+ with dag_maker("test_dag", session=session) as dag:
+ BashOperator(task_id="test_task", bash_command="echo 1")
+
+ dr = dag_maker.create_dagrun()
+ dr.dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
+ ti = dr.get_task_instance("test_task")
+ # Simulate a task instance created before the dag_version table
existed.
+ ti.dag_version_id = None
+ session.flush()
+
+ callback = dr.produce_dag_callback(dag=dag, success=False,
relevant_ti=ti, reason="task_failure")
+
+ assert callback is not None
+ # last_ti is dropped so the non-null UUID datamodel validation never
fires.
+ assert callback.context_from_server is not None
+ assert callback.context_from_server.last_ti is None
+
+ def test_execute_dag_callbacks_without_dag_version(self, dag_maker,
session):
+ """The execute=True path must also tolerate a TI with
dag_version_id=None."""
+ context_received = None
+
+ def on_failure(context):
+ nonlocal context_received
+ context_received = context
+
+ with dag_maker("test_dag", session=session,
on_failure_callback=on_failure) as dag:
+ BashOperator(task_id="test_task", bash_command="echo 1")
+
+ dr = dag_maker.create_dagrun()
+ dr.dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
+ ti = dr.get_task_instance("test_task")
+ ti.dag_version_id = None
+ session.flush()
+
+ dag.on_failure_callback = on_failure
+ dag.has_on_failure_callback = True
+
+ dr.produce_dag_callback(dag=dag, success=False, relevant_ti=ti,
reason="task_failure", execute=True)
+
+ # Callback still fires with the minimal fallback context (no last_ti
template vars).
+ assert context_received is not None
+ assert context_received["reason"] == "task_failure"
+ assert "ti" not in context_received
+ assert context_received["run_id"] == dr.run_id
+
class TestDagRunTracing:
"""Tests for DagRun OpenTelemetry span behavior."""