This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dd0eab9dd44 Fix memory issue: remove eager loading of all TIs in
scheduler (#60956)
dd0eab9dd44 is described below
commit dd0eab9dd4456dae4f6835040ebb2931508fe8fb
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Jan 22 23:29:47 2026 +0000
Fix memory issue: remove eager loading of all TIs in scheduler (#60956)
The scheduler's `get_running_dag_runs_to_examine()` was using
`joinedload(DagRun.task_instances)` which loaded ALL TaskInstances
for each DagRun into memory. This could cause significant memory
pressure with large DAGs having many tasks.
Changes:
- Remove unnecessary `joinedload(task_instances)` from the query
- Convert the TI loop in `_verify_integrity_if_dag_changed` to a
bulk UPDATE statement, avoiding loading TIs into memory entirely
- Add `session.expire()` to ensure relationship cache coherence
This reduces memory usage in the scheduler's hot path, especially
beneficial for DAGs with hundreds of tasks.
---
airflow-core/src/airflow/jobs/scheduler_job_runner.py | 18 ++++++++++++++----
airflow-core/src/airflow/models/dagrun.py | 1 -
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 4851ccfbc47..eac612d8c1e 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2275,10 +2275,20 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run,
session=session)
if not dag_run.dag:
return False
- # Select all TIs in State.unfinished and update the dag_version_id
- for ti in dag_run.task_instances:
- if ti.state in State.unfinished:
- ti.dag_version = latest_dag_version
+ # Bulk update dag_version_id for unfinished TIs instead of loading all
TIs into memory.
+ # Use synchronize_session=False since we handle cache coherence via
session.expire() below.
+ session.execute(
+ update(TI)
+ .where(
+ TI.dag_id == dag_run.dag_id,
+ TI.run_id == dag_run.run_id,
+ TI.state.in_(State.unfinished),
+ )
+ .values(dag_version_id=latest_dag_version.id),
+ execution_options={"synchronize_session": False},
+ )
+ # Expire task_instances relationship so next access fetches fresh data
from DB
+ session.expire(dag_run, ["task_instances"])
# Verify integrity also takes care of session.flush
dag_run.verify_integrity(dag_version_id=latest_dag_version.id,
session=session)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 5b2b0578c19..54c889439f5 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -602,7 +602,6 @@ class DagRun(Base, LoggingMixin):
DagModel.is_paused == false(),
DagModel.is_stale == false(),
)
- .options(joinedload(cls.task_instances))
.order_by(
nulls_first(cast("ColumnElement[Any]",
BackfillDagRun.sort_ordinal), session=session),
nulls_first(cast("ColumnElement[Any]",
cls.last_scheduling_decision), session=session),