This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8da26cce48f Fix N+1 query: add joinedload for asset in 
dags_needing_dagruns() (#60957)
8da26cce48f is described below

commit 8da26cce48f922e91450dee4a70fde8ae477ea70
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Jan 22 23:29:02 2026 +0000

    Fix N+1 query: add joinedload for asset in dags_needing_dagruns() (#60957)
    
    Add `joinedload(AssetDagRunQueue.asset)` to avoid N+1 when accessing
    `adrq.asset` in the dag_statuses dict comprehension.
---
 airflow-core/src/airflow/models/dag.py     |  7 ++++++-
 airflow-core/tests/unit/models/test_dag.py | 28 ++++++++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 92d2e46e4f7..12bb7b0e875 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -630,7 +630,12 @@ class DagModel(Base):
 
         # this loads all the ADRQ records.... may need to limit num dags
         adrq_by_dag: dict[str, list[AssetDagRunQueue]] = defaultdict(list)
-        for adrq in 
session.scalars(select(AssetDagRunQueue).options(joinedload(AssetDagRunQueue.dag_model))):
+        for adrq in session.scalars(
+            select(AssetDagRunQueue).options(
+                joinedload(AssetDagRunQueue.dag_model),
+                joinedload(AssetDagRunQueue.asset),
+            )
+        ):
             if adrq.dag_model.asset_expression is None:
                 # The dag referenced does not actually depend on an asset! This
                 # could happen if the dag DID depend on an asset at some point,
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 20901fed9bd..1853b4402d1 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -1998,6 +1998,34 @@ class TestDagModel:
         dag_models = query.all()
         assert dag_models == [dag_model]
 
+    def test_dags_needing_dagruns_query_count(self, dag_maker, session):
+        """Test that dags_needing_dagruns avoids N+1 on adrq.asset access."""
+        num_assets = 10
+        assets = [Asset(uri=f"test://asset{i}", group="test-group") for i in 
range(num_assets)]
+
+        with dag_maker(
+            session=session,
+            dag_id="my_dag",
+            max_active_runs=10,
+            schedule=assets,
+            start_date=pendulum.now().add(days=-2),
+        ):
+            EmptyOperator(task_id="dummy")
+
+        dag_model = dag_maker.dag_model
+        asset_models = dag_model.schedule_assets
+        assert len(asset_models) == num_assets
+        for asset_model in asset_models:
+            session.add(AssetDagRunQueue(asset_id=asset_model.id, 
target_dag_id=dag_model.dag_id))
+        session.flush()
+
+        # Clear identity map so N+1 on adrq.asset is exposed
+        session.expire_all()
+
+        with assert_queries_count(6):
+            query, _ = DagModel.dags_needing_dagruns(session)
+            query.all()
+
     def test_dags_needing_dagruns_asset_aliases(self, dag_maker, session):
         # link asset_alias hello_alias to asset hello
         asset_model = AssetModel(uri="hello")

Reply via email to