This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch v3-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0656380f62a156cff8db078d4f3e33eb5fd3d58b Author: Wei Lee <[email protected]> AuthorDate: Wed Mar 25 14:37:43 2026 +0800 fix(Asset-Partition): sort partitioned DagRun by partition_date (#62866) (cherry picked from commit 1baafd48652279a0f2214db885afae2890889731) --- .../src/airflow/dag_processing/collection.py | 7 +++-- .../tests/unit/dag_processing/test_collection.py | 31 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index e5925bdd3e5..1d60c32020f 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -136,7 +136,6 @@ def _get_latest_runs_stmt(dag_id: str) -> Select: def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select: """Build a select statement to retrieve the last partitioned run for each Dag.""" - # todo: AIP-76 we should add a partition date field latest_run_id = ( select(DagRun.id) .where( @@ -149,7 +148,11 @@ def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select: ), DagRun.partition_key.is_not(None), ) - .order_by(DagRun.id.desc()) # todo: AIP-76 add partition date and sort by it here + .order_by( + DagRun.partition_date.is_(None), + DagRun.partition_date.desc(), + DagRun.run_after.desc(), + ) .limit(1) .scalar_subquery() ) diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index 1c12371de7d..77dc1318b05 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -37,6 +37,7 @@ from airflow.dag_processing.collection import ( AssetModelOperation, DagModelOperation, _get_latest_runs_stmt, + _get_latest_runs_stmt_partitioned, _update_dag_tags, update_dag_parsing_results_in_db, ) @@ -58,6 +59,7 @@ from airflow.sdk import DAG, Asset, AssetAlias, AssetWatcher from airflow.serialization.definitions.assets import SerializedAsset from airflow.serialization.encoders import ensure_serialized_asset from airflow.serialization.serialized_objects import LazyDeserializedDAG +from airflow.utils.types import DagRunType from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import ( @@ -96,6 +98,35 @@ def test_statement_latest_runs_one_dag(): assert actual == expected, compiled_stmt [email protected]_test +def test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker, session): + with dag_maker("fake-dag", schedule=None): + pass + dag_maker.sync_dagbag_to_db() + + for i, (run_id, partition_key, partition_date) in enumerate( + ( + ("newest-partition-date", "2025-01-02", tz.datetime(2025, 1, 2)), + ("older-partition-date", "2025-01-01", tz.datetime(2025, 1, 1)), + ("null-partition-date", "not-a-time-based-partition", None), + ) + ): + dag_maker.create_dagrun( + run_id=run_id, + logical_date=None, + data_interval=None, + run_type=DagRunType.SCHEDULED, + run_after=tz.datetime(2025, 1, 1 + i), + partition_key=partition_key, + partition_date=partition_date, + session=session, + ) + + latest = session.scalar(_get_latest_runs_stmt_partitioned("fake-dag")) + assert latest is not None + assert latest.partition_date == tz.datetime(2025, 1, 2) + + @pytest.mark.db_test class TestAssetModelOperation: @staticmethod
