This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dd0eab9dd44 Fix memory issue: remove eager loading of all TIs in 
scheduler (#60956)
dd0eab9dd44 is described below

commit dd0eab9dd4456dae4f6835040ebb2931508fe8fb
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Jan 22 23:29:47 2026 +0000

    Fix memory issue: remove eager loading of all TIs in scheduler (#60956)
    
    The scheduler's `get_running_dag_runs_to_examine()` was using
    `joinedload(DagRun.task_instances)` which loaded ALL TaskInstances
    for each DagRun into memory. This could cause significant memory
    pressure with large DAGs having many tasks.
    
    Changes:
    - Remove unnecessary `joinedload(task_instances)` from the query
    - Convert the TI loop in `_verify_integrity_if_dag_changed` to a
      bulk UPDATE statement, avoiding loading TIs into memory entirely
    - Add `session.expire()` to ensure relationship cache coherence
    
    This reduces memory usage in the scheduler's hot path, especially
    beneficial for DAGs with hundreds of tasks.
---
 airflow-core/src/airflow/jobs/scheduler_job_runner.py | 18 ++++++++++++++----
 airflow-core/src/airflow/models/dagrun.py             |  1 -
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 4851ccfbc47..eac612d8c1e 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2275,10 +2275,20 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, 
session=session)
         if not dag_run.dag:
             return False
-        # Select all TIs in State.unfinished and update the dag_version_id
-        for ti in dag_run.task_instances:
-            if ti.state in State.unfinished:
-                ti.dag_version = latest_dag_version
+        # Bulk update dag_version_id for unfinished TIs instead of loading all 
TIs into memory.
+        # Use synchronize_session=False since we handle cache coherence via 
session.expire() below.
+        session.execute(
+            update(TI)
+            .where(
+                TI.dag_id == dag_run.dag_id,
+                TI.run_id == dag_run.run_id,
+                TI.state.in_(State.unfinished),
+            )
+            .values(dag_version_id=latest_dag_version.id),
+            execution_options={"synchronize_session": False},
+        )
+        # Expire task_instances relationship so next access fetches fresh data 
from DB
+        session.expire(dag_run, ["task_instances"])
         # Verify integrity also takes care of session.flush
         dag_run.verify_integrity(dag_version_id=latest_dag_version.id, 
session=session)
 
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 5b2b0578c19..54c889439f5 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -602,7 +602,6 @@ class DagRun(Base, LoggingMixin):
                 DagModel.is_paused == false(),
                 DagModel.is_stale == false(),
             )
-            .options(joinedload(cls.task_instances))
             .order_by(
                 nulls_first(cast("ColumnElement[Any]", 
BackfillDagRun.sort_ordinal), session=session),
                 nulls_first(cast("ColumnElement[Any]", 
cls.last_scheduling_decision), session=session),

Reply via email to