This is an automated email from the ASF dual-hosted git repository.
ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8395c29ffd5 Fix scheduler orphaned task reset logging crash (#67822)
8395c29ffd5 is described below
commit 8395c29ffd518e2daf7f23094d0c76e95adb6256
Author: Revanth <[email protected]>
AuthorDate: Mon Jun 1 07:13:48 2026 -0500
Fix scheduler orphaned task reset logging crash (#67822)
The scheduler orphaned-task adoption path loaded task instances with a
narrow load_only(...) set and then called repr(ti) while building the reset log
message. TaskInstance.__repr__ reads map_index and state, so detached
instances with deferred columns could raise DetachedInstanceError and crash
the scheduler on restart.
---
.../src/airflow/jobs/scheduler_job_runner.py | 12 ++++++-
airflow-core/src/airflow/models/taskinstance.py | 21 +++++++++---
airflow-core/tests/unit/jobs/test_scheduler_job.py | 36 ++++++++++++++++++-
.../tests/unit/models/test_taskinstance.py | 40 +++++++++++++++++++++-
4 files changed, 102 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index f61e20840ed..ff9e5921d44 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2851,7 +2851,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
.where(Job.state.is_distinct_from(JobState.RUNNING))
.join(TI.dag_run)
.where(DagRun.state == DagRunState.RUNNING)
- .options(load_only(TI.dag_id, TI.task_id, TI.run_id,
TI.external_executor_id))
+ .options(
+ load_only(
+ TI.id,
+ TI.dag_id,
+ TI.task_id,
+ TI.run_id,
+ TI.map_index,
+ TI.state,
+ TI.external_executor_id,
+ )
+ )
)
# Lock these rows, so that another scheduler can't try and
adopt these too
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index f9d498c1ab1..3469cf4acb8 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -66,6 +66,7 @@ from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import Mapped, lazyload, mapped_column, reconstructor,
relationship
from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value
+from sqlalchemy.orm.exc import DetachedInstanceError, ObjectDeletedError
from airflow import settings
from airflow._shared.observability.metrics import stats
@@ -1158,10 +1159,22 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
yield dep_status
def __repr__(self) -> str:
- prefix = f"<TaskInstance: {self.dag_id}.{self.task_id} {self.run_id} "
- if self.map_index != -1:
- prefix += f"map_index={self.map_index} "
- return prefix + f"[{self.state}] ti_id={self.id}>"
+ # ``__repr__`` is used in logging and must never raise. Real values
are printed
+ # whenever they can be read (including a normal lazy-load on an
*attached* instance);
+ # we only fall back to a placeholder when SQLAlchemy cannot produce
the value at all:
+ # a deferred column on a *detached* instance (DetachedInstanceError),
or a row deleted
+ # out from under an expired instance (ObjectDeletedError).
+ def field(name: str) -> Any:
+ try:
+ return getattr(self, name)
+ except (DetachedInstanceError, ObjectDeletedError):
+ return "<deferred>"
+
+ prefix = f"<TaskInstance: {field('dag_id')}.{field('task_id')}
{field('run_id')} "
+ map_index = field("map_index")
+ if map_index != -1:
+ prefix += f"map_index={map_index} "
+ return prefix + f"[{field('state')}] ti_id={field('id')}>"
def next_retry_datetime(self):
"""
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 3b576271610..fc267802252 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -36,7 +36,7 @@ import pendulum
import psutil
import pytest
import time_machine
-from sqlalchemy import delete, func, select, update
+from sqlalchemy import delete, func, inspect, select, update
from sqlalchemy.dialects import mysql
from sqlalchemy.orm import joinedload
@@ -3297,6 +3297,40 @@ class TestSchedulerJob:
ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
assert ti2.state == State.NONE, "Tasks run by Backfill Jobs should be
treated the same"
+ def test_adopt_or_reset_orphaned_tasks_loads_state_for_reset_logging(
+ self, dag_maker, session, mock_executor
+ ):
+ with
dag_maker("test_adopt_or_reset_orphaned_tasks_loads_state_for_reset_logging",
session=session):
+ op1 = EmptyOperator(task_id="op1")
+
+ scheduler_job = Job()
+ session.add(scheduler_job)
+ session.flush()
+
+ dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+ ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+ ti.state = State.QUEUED
+ ti.queued_by_job_id = scheduler_job.id
+ session.commit()
+ session.expunge_all()
+
+ def refuse_adoption(tis):
+ assert len(tis) == 1
+ # ``repr(ti)`` in the reset path reads both ``state`` and
``map_index``; the query
+ # must load both so the reset log stays accurate (and never
lazy-loads on detach).
+ unloaded = inspect(tis[0]).unloaded
+ assert "state" not in unloaded
+ assert "map_index" not in unloaded
+ # repr must render the real state, not the ``<deferred>`` fallback.
+ assert "queued" in repr(tis[0])
+ return tis
+
+ mock_executor.try_adopt_task_instances.side_effect = refuse_adoption
+
+ self.job_runner = SchedulerJobRunner(job=Job(), num_runs=0)
+
+ assert self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
== 1
+
def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker,
mock_executors):
"""
Test that with multiple executors configured tasks are sorted
correctly and handed off to the
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index dae7f419ff4..99cdd38f4f3 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -33,8 +33,9 @@ import uuid6
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
-from sqlalchemy import delete, func, select
+from sqlalchemy import delete, func, inspect as sa_inspect, select
from sqlalchemy.exc import IntegrityError
+from sqlalchemy.orm import load_only
from sqlalchemy.orm.attributes import set_committed_value
from airflow import settings
@@ -3897,3 +3898,40 @@ def
test_clear_task_instances_preserves_detail_level(dag_maker, session):
new_ctx = TraceContextTextMapPropagator().extract(dag_run.context_carrier)
span = trace.get_current_span(new_ctx)
assert get_task_span_detail_level(span) == 2
+
+
[email protected]_test
+def test_task_instance_repr_does_not_raise_for_deferred_columns(dag_maker,
session):
+ """``TaskInstance.__repr__`` must survive *any* deferred column it reads.
+
+ Regression test for issue #67813: the scheduler's orphaned-task adoption
loaded
+ TaskInstances via ``load_only`` and then called ``repr(ti)`` on detached
instances.
+ ``__repr__`` reads ``map_index`` and ``state`` (among others); on a
detached instance
+ a column that was not loaded raises ``DetachedInstanceError``.
``__repr__`` is used in
+ logging and must degrade gracefully — printing ``<deferred>`` — instead of
crashing.
+ """
+ with dag_maker("test_repr_deferred_columns", session=session):
+ EmptyOperator(task_id="op1")
+ dr = dag_maker.create_dagrun()
+ ti = dr.get_task_instance(task_id="op1", session=session)
+ ti.state = State.QUEUED
+ session.commit()
+ ti_id = ti.id
+
+ # Reload the row with ``map_index`` and ``state`` left as deferred
columns, then detach
+ # the instance so that touching them would otherwise require a
(now-impossible) DB load.
+ session.expunge_all()
+ reloaded = session.scalar(
+ select(TaskInstance)
+ .where(TaskInstance.id == ti_id)
+ .options(load_only(TaskInstance.dag_id, TaskInstance.task_id,
TaskInstance.run_id))
+ )
+ session.expunge(reloaded)
+ unloaded = sa_inspect(reloaded).unloaded
+ assert "map_index" in unloaded
+ assert "state" in unloaded
+
+ result = repr(reloaded) # would raise DetachedInstanceError without the
guard
+
+ assert "<deferred>" in result
+ assert "[queued]" not in result