This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c71e37848c Avoid lazy-loading timetable fields for latest DagRuns 
(#66488)
4c71e37848c is described below

commit 4c71e37848ce19d83bb9a62267afe440cbb50e5c
Author: Henry Chen <[email protected]>
AuthorDate: Tue Jun 2 21:31:44 2026 +0800

    Avoid lazy-loading timetable fields for latest DagRuns (#66488)
---
 .../src/airflow/dag_processing/collection.py       |  6 ++++
 .../tests/unit/dag_processing/test_collection.py   | 42 +++++++++++++++++++---
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index 785763bf9d8..7cdf490d938 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -129,8 +129,11 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
             load_only(
                 DagRun.dag_id,
                 DagRun.logical_date,
+                DagRun.run_after,
                 DagRun.data_interval_start,
                 DagRun.data_interval_end,
+                DagRun.partition_key,
+                DagRun.partition_date,
             )
         )
     )
@@ -165,8 +168,11 @@ def _get_latest_runs_stmt_partitioned(dag_id: str) -> 
Select:
             load_only(
                 DagRun.dag_id,
                 DagRun.logical_date,
+                DagRun.run_after,
                 DagRun.data_interval_start,
                 DagRun.data_interval_end,
+                DagRun.partition_key,
+                DagRun.partition_date,
             )
         )
     )
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 8a536dbb12a..bc7a1490e16 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -27,7 +27,7 @@ from unittest import mock
 from unittest.mock import patch
 
 import pytest
-from sqlalchemy import delete, func, select
+from sqlalchemy import delete, func, inspect as sa_inspect, select
 from sqlalchemy.exc import OperationalError, SAWarning
 
 import airflow.dag_processing.collection
@@ -89,8 +89,9 @@ def test_statement_latest_runs_one_dag():
         compiled_stmt = str(stmt.compile())
         actual = [x.strip() for x in compiled_stmt.splitlines()]
         expected = [
-            "SELECT dag_run.id, dag_run.dag_id, dag_run.logical_date, "
-            "dag_run.data_interval_start, dag_run.data_interval_end",
+            "SELECT dag_run.id, dag_run.dag_id, dag_run.logical_date, 
dag_run.data_interval_start, "
+            "dag_run.data_interval_end, dag_run.run_after, 
dag_run.partition_key, "
+            "dag_run.partition_date",
             "FROM dag_run",
             "WHERE dag_run.dag_id = :dag_id_1 AND dag_run.logical_date = ("
             "SELECT max(dag_run.logical_date) AS max_logical_date",
@@ -101,11 +102,38 @@ def test_statement_latest_runs_one_dag():
 
 
 @pytest.mark.db_test
-def test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker, 
session):
+def test_statement_latest_runs_loads_timetable_fields(dag_maker, session):
     with dag_maker("fake-dag", schedule=None):
         pass
     dag_maker.sync_dagbag_to_db()
 
+    logical_date = tz.datetime(2025, 1, 1)
+    run_after = tz.datetime(2025, 1, 2)
+
+    dag_maker.create_dagrun(
+        run_id="latest-run",
+        logical_date=logical_date,
+        data_interval=(logical_date, run_after),
+        run_type=DagRunType.SCHEDULED,
+        run_after=run_after,
+        session=session,
+    )
+    session.flush()
+    session.expunge_all()  # Ensure we load from DB, not from session cache
+
+    latest = session.scalar(_get_latest_runs_stmt("fake-dag"))
+    assert latest is not None
+    assert {"run_after", "partition_date", 
"partition_key"}.isdisjoint(sa_inspect(latest).unloaded)
+    assert latest.run_after == run_after
+    assert latest.partition_key is None
+    assert latest.partition_date is None
+
+
[email protected]_test
+def test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker, 
session):
+    with dag_maker("fake-dag", schedule=None):
+        pass
+    dag_maker.sync_dagbag_to_db()
     for i, (run_id, partition_key, partition_date) in enumerate(
         (
             ("newest-partition-date", "2025-01-02", tz.datetime(2025, 1, 2)),
@@ -124,8 +152,14 @@ def 
test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker, s
             session=session,
         )
 
+    session.flush()
+    session.expunge_all()  # Ensure we load from DB, not from session cache
+
     latest = session.scalar(_get_latest_runs_stmt_partitioned("fake-dag"))
     assert latest is not None
+    assert {"run_after", "partition_date", 
"partition_key"}.isdisjoint(sa_inspect(latest).unloaded)
+    assert latest.run_after == tz.datetime(2025, 1, 1)
+    assert latest.partition_key == "2025-01-02"
     assert latest.partition_date == tz.datetime(2025, 1, 2)
 
 

Reply via email to