This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8da26cce48f Fix N+1 query: add joinedload for asset in
dags_needing_dagruns() (#60957)
8da26cce48f is described below
commit 8da26cce48f922e91450dee4a70fde8ae477ea70
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Jan 22 23:29:02 2026 +0000
Fix N+1 query: add joinedload for asset in dags_needing_dagruns() (#60957)
Add `joinedload(AssetDagRunQueue.asset)` to avoid N+1 when accessing
`adrq.asset` in the dag_statuses dict comprehension.
---
airflow-core/src/airflow/models/dag.py | 7 ++++++-
airflow-core/tests/unit/models/test_dag.py | 28 ++++++++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 92d2e46e4f7..12bb7b0e875 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -630,7 +630,12 @@ class DagModel(Base):
# this loads all the ADRQ records.... may need to limit num dags
adrq_by_dag: dict[str, list[AssetDagRunQueue]] = defaultdict(list)
- for adrq in
session.scalars(select(AssetDagRunQueue).options(joinedload(AssetDagRunQueue.dag_model))):
+ for adrq in session.scalars(
+ select(AssetDagRunQueue).options(
+ joinedload(AssetDagRunQueue.dag_model),
+ joinedload(AssetDagRunQueue.asset),
+ )
+ ):
if adrq.dag_model.asset_expression is None:
# The dag referenced does not actually depend on an asset! This
# could happen if the dag DID depend on an asset at some point,
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 20901fed9bd..1853b4402d1 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -1998,6 +1998,34 @@ class TestDagModel:
dag_models = query.all()
assert dag_models == [dag_model]
+ def test_dags_needing_dagruns_query_count(self, dag_maker, session):
+ """Test that dags_needing_dagruns avoids N+1 on adrq.asset access."""
+ num_assets = 10
+ assets = [Asset(uri=f"test://asset{i}", group="test-group") for i in
range(num_assets)]
+
+ with dag_maker(
+ session=session,
+ dag_id="my_dag",
+ max_active_runs=10,
+ schedule=assets,
+ start_date=pendulum.now().add(days=-2),
+ ):
+ EmptyOperator(task_id="dummy")
+
+ dag_model = dag_maker.dag_model
+ asset_models = dag_model.schedule_assets
+ assert len(asset_models) == num_assets
+ for asset_model in asset_models:
+ session.add(AssetDagRunQueue(asset_id=asset_model.id,
target_dag_id=dag_model.dag_id))
+ session.flush()
+
+ # Clear identity map so N+1 on adrq.asset is exposed
+ session.expire_all()
+
+ with assert_queries_count(6):
+ query, _ = DagModel.dags_needing_dagruns(session)
+ query.all()
+
def test_dags_needing_dagruns_asset_aliases(self, dag_maker, session):
# link asset_alias hello_alias to asset hello
asset_model = AssetModel(uri="hello")