This is an automated email from the ASF dual-hosted git repository. vatsrahul1001 pushed a commit to branch backport-8395c29-v3-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 746803879f2c54e62e0162efc2bd5298cd3ea987 Author: Revanth <[email protected]> AuthorDate: Mon Jun 1 07:13:48 2026 -0500 Fix scheduler orphaned task reset logging crash (#67822) The scheduler orphaned-task adoption path loaded task instances with a narrow load_only(...) set and then called repr(ti) while building the reset log message. TaskInstance.__repr__ reads map_index and state, so detached instances with deferred columns could raise DetachedInstanceError and crash the scheduler on restart. --- .../src/airflow/jobs/scheduler_job_runner.py | 12 ++++++- airflow-core/src/airflow/models/taskinstance.py | 21 +++++++++--- airflow-core/tests/unit/jobs/test_scheduler_job.py | 36 ++++++++++++++++++- .../tests/unit/models/test_taskinstance.py | 40 +++++++++++++++++++++- 4 files changed, 102 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 9e1fb20165c..32a8ce6ce9a 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2843,7 +2843,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): .where(Job.state.is_distinct_from(JobState.RUNNING)) .join(TI.dag_run) .where(DagRun.state == DagRunState.RUNNING) - .options(load_only(TI.dag_id, TI.task_id, TI.run_id, TI.external_executor_id)) + .options( + load_only( + TI.id, + TI.dag_id, + TI.task_id, + TI.run_id, + TI.map_index, + TI.state, + TI.external_executor_id, + ) + ) ) # Lock these rows, so that another scheduler can't try and adopt these too diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 026979ee342..edbb09b5188 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -66,6 +66,7 @@ from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.orm import Mapped, lazyload, mapped_column, reconstructor, relationship from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value +from sqlalchemy.orm.exc import DetachedInstanceError, ObjectDeletedError from airflow import settings from airflow._shared.observability.metrics.dual_stats_manager import DualStatsManager @@ -1128,10 +1129,22 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload): yield dep_status def __repr__(self) -> str: - prefix = f"<TaskInstance: {self.dag_id}.{self.task_id} {self.run_id} " - if self.map_index != -1: - prefix += f"map_index={self.map_index} " - return prefix + f"[{self.state}] ti_id={self.id}>" + # ``__repr__`` is used in logging and must never raise. Real values are printed + # whenever they can be read (including a normal lazy-load on an *attached* instance); + # we only fall back to a placeholder when SQLAlchemy cannot produce the value at all: + # a deferred column on a *detached* instance (DetachedInstanceError), or a row deleted + # out from under an expired instance (ObjectDeletedError). + def field(name: str) -> Any: + try: + return getattr(self, name) + except (DetachedInstanceError, ObjectDeletedError): + return "<deferred>" + + prefix = f"<TaskInstance: {field('dag_id')}.{field('task_id')} {field('run_id')} " + map_index = field("map_index") + if map_index != -1: + prefix += f"map_index={map_index} " + return prefix + f"[{field('state')}] ti_id={field('id')}>" def next_retry_datetime(self): """ diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index efe94326696..d3766788fec 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -36,7 +36,7 @@ import pendulum import psutil import pytest import time_machine -from sqlalchemy import delete, func, select, update +from sqlalchemy import delete, func, inspect, select, update from sqlalchemy.dialects import mysql from sqlalchemy.orm import joinedload @@ -3265,6 +3265,40 @@ class TestSchedulerJob: ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) assert ti2.state == State.NONE, "Tasks run by Backfill Jobs should be treated the same" + def test_adopt_or_reset_orphaned_tasks_loads_state_for_reset_logging( + self, dag_maker, session, mock_executor + ): + with dag_maker("test_adopt_or_reset_orphaned_tasks_loads_state_for_reset_logging", session=session): + op1 = EmptyOperator(task_id="op1") + + scheduler_job = Job() + session.add(scheduler_job) + session.flush() + + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.QUEUED + ti.queued_by_job_id = scheduler_job.id + session.commit() + session.expunge_all() + + def refuse_adoption(tis): + assert len(tis) == 1 + # ``repr(ti)`` in the reset path reads both ``state`` and ``map_index``; the query + # must load both so the reset log stays accurate (and never lazy-loads on detach). + unloaded = inspect(tis[0]).unloaded + assert "state" not in unloaded + assert "map_index" not in unloaded + # repr must render the real state, not the ``<deferred>`` fallback. + assert "queued" in repr(tis[0]) + return tis + + mock_executor.try_adopt_task_instances.side_effect = refuse_adoption + + self.job_runner = SchedulerJobRunner(job=Job(), num_runs=0) + + assert self.job_runner.adopt_or_reset_orphaned_tasks(session=session) == 1 + def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_executors): """ Test that with multiple executors configured tasks are sorted correctly and handed off to the diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 8e3cfc7c028..d3878b26f1c 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -33,8 +33,9 @@ import uuid6 from opentelemetry import trace as otel_trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from sqlalchemy import delete, func, select +from sqlalchemy import delete, func, inspect as sa_inspect, select from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import load_only from sqlalchemy.orm.attributes import set_committed_value from airflow import settings @@ -3527,3 +3528,40 @@ def test_clear_task_instances_resets_context_carrier(dag_maker, session): assert ti.context_carrier["traceparent"] != original_ti_traceparent assert dag_run.context_carrier["traceparent"] != original_dr_traceparent + + [email protected]_test +def test_task_instance_repr_does_not_raise_for_deferred_columns(dag_maker, session): + """``TaskInstance.__repr__`` must survive *any* deferred column it reads. + + Regression test for issue #67813: the scheduler's orphaned-task adoption loaded + TaskInstances via ``load_only`` and then called ``repr(ti)`` on detached instances. + ``__repr__`` reads ``map_index`` and ``state`` (among others); on a detached instance + a column that was not loaded raises ``DetachedInstanceError``. ``__repr__`` is used in + logging and must degrade gracefully — printing ``<deferred>`` — instead of crashing. + """ + with dag_maker("test_repr_deferred_columns", session=session): + EmptyOperator(task_id="op1") + dr = dag_maker.create_dagrun() + ti = dr.get_task_instance(task_id="op1", session=session) + ti.state = State.QUEUED + session.commit() + ti_id = ti.id + + # Reload the row with ``map_index`` and ``state`` left as deferred columns, then detach + # the instance so that touching them would otherwise require a (now-impossible) DB load. + session.expunge_all() + reloaded = session.scalar( + select(TaskInstance) + .where(TaskInstance.id == ti_id) + .options(load_only(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.run_id)) + ) + session.expunge(reloaded) + unloaded = sa_inspect(reloaded).unloaded + assert "map_index" in unloaded + assert "state" in unloaded + + result = repr(reloaded) # would raise DetachedInstanceError without the guard + + assert "<deferred>" in result + assert "[queued]" not in result
