This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 774b5b50e96 Determine latest Dag version by version_number, not 
created_at (#68389)
774b5b50e96 is described below

commit 774b5b50e962304ac838d4ce70504cf2eb44285d
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jun 15 12:36:25 2026 +0100

    Determine latest Dag version by version_number, not created_at (#68389)
    
    * Determine latest Dag version by version_number, not created_at
    
    The latest DagVersion was selected by created_at DESC, which
     is not deterministic when two versions share a timestamp (or under clock 
skew).
    write_dag derives the next version_number from that row, so picking a 
non-max
    row collided with the (dag_id, version_number) unique constraint.
    Order by the monotonic, unique version_number instead, consistently
    across get_latest_version/write_dag, get_version, and the bulk prefetch.
    
    * Fix grid multi-version test relying on created_at tie-breaking
    
    * Select latest serialized DAG by version_number, not created_at
    
    Extends the version_number-ordering fix to the remaining 
latest-serialized-DAG
    selectors: latest_item_select_object (SerializedDagModel.get, backfill),
    get_latest_serialized_dags (scheduler, partitions) and read_all_dags. 
Besides
    the created_at tie nondeterminism, get_latest_serialized_dags/read_all_dags
    joined on created_at == max(created_at), which returned duplicate rows per
    dag_id under a tie; matching on the unique version_number fixes that (and
    corrects read_all_dags' Python 'and' join condition).
    
    * Extract latest-serialized-DAG-by-version_number select into a helper
    
    Dedupes the identical 'max version_number per dag_id' subquery used by
    get_latest_serialized_dags and read_all_dags into _latest_by_version_select.
---
 airflow-core/src/airflow/models/dag_version.py     |  9 ++-
 airflow-core/src/airflow/models/serialized_dag.py  | 87 ++++++++++++++--------
 .../api_fastapi/core_api/routes/ui/test_grid.py    | 13 ++--
 airflow-core/tests/unit/models/test_dag_version.py | 61 ++++++++++++++-
 .../tests/unit/models/test_serialized_dag.py       | 43 +++++++++++
 5 files changed, 172 insertions(+), 41 deletions(-)

diff --git a/airflow-core/src/airflow/models/dag_version.py 
b/airflow-core/src/airflow/models/dag_version.py
index 45ea40c091b..e6564a6da06 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -171,7 +171,12 @@ class DagVersion(Base):
         if load_bundle_model:
             query = query.options(joinedload(cls.bundle))
 
-        query = query.order_by(cls.created_at.desc()).limit(1)
+        # Order by version_number, not created_at: version_number is monotonic 
and unique per
+        # dag_id, so it is deterministic even when two versions share a 
created_at timestamp.
+        # write_dag relies on this select to compute the next version_number; 
ordering by
+        # created_at could pick a non-max row under a tie and collide with the
+        # (dag_id, version_number) unique constraint.
+        query = query.order_by(cls.version_number.desc()).limit(1)
         return query
 
     @classmethod
@@ -224,7 +229,7 @@ class DagVersion(Base):
         if version_number:
             version_select_obj = version_select_obj.where(cls.version_number 
== version_number)
 
-        return 
session.scalar(version_select_obj.order_by(cls.id.desc()).limit(1))
+        return 
session.scalar(version_select_obj.order_by(cls.version_number.desc()).limit(1))
 
     @property
     def version(self) -> str:
diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index a27cad68908..ce033365311 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -58,6 +58,7 @@ from airflow.utils.sqlalchemy import UtcDateTime, 
get_dialect_name
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
     from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql import Select
     from sqlalchemy.sql.elements import ColumnElement
 
     from airflow.serialization.definitions.dag import SerializedDAG
@@ -544,15 +545,18 @@ class SerializedDagModel(Base):
         if not dag_id_list:
             return {}
 
-        # Fetch latest serialized_dag (last_updated, dag_hash) per dag_id
-        # using a window function to pick the most recent row.
+        # Fetch the serialized_dag (last_updated, dag_hash) of the latest 
DagVersion per dag_id,
+        # ordering by version_number so it stays consistent with the 
DagVersion picked by dv_subq.
         sd_subq = (
             select(
                 cls.dag_id.label("dag_id"),
                 cls.last_updated.label("last_updated"),
                 cls.dag_hash.label("dag_hash"),
-                func.row_number().over(partition_by=cls.dag_id, 
order_by=cls.created_at.desc()).label("rn"),
+                func.row_number()
+                .over(partition_by=cls.dag_id, 
order_by=DagVersion.version_number.desc())
+                .label("rn"),
             )
+            .join(DagVersion, cls.dag_version_id == DagVersion.id)
             .where(cls.dag_id.in_(dag_id_list))
             .subquery()
         )
@@ -563,14 +567,13 @@ class SerializedDagModel(Base):
             row.dag_id: (row.last_updated, row.dag_hash) for row in sd_rows
         }
 
-        # Fetch latest DagVersion per dag_id using a window function,
-        # matching the original write_dag ordering (ORDER BY created_at DESC).
+        # Fetch latest DagVersion per dag_id, ordering by version_number to 
match write_dag.
         dv_subq = (
             select(
                 DagVersion.id.label("id"),
                 DagVersion.dag_id.label("dag_id"),
                 func.row_number()
-                .over(partition_by=DagVersion.dag_id, 
order_by=DagVersion.created_at.desc())
+                .over(partition_by=DagVersion.dag_id, 
order_by=DagVersion.version_number.desc())
                 .label("rn"),
             )
             .where(DagVersion.dag_id.in_(dag_id_list))
@@ -780,14 +783,55 @@ class SerializedDagModel(Base):
     def latest_item_select_object(cls, dag_id):
         from airflow.settings import engine
 
+        # Order by the version's version_number (monotonic, unique per 
dag_id), not created_at,
+        # so the latest serialized DAG is picked deterministically even when 
two versions share a
+        # created_at timestamp.
         if engine.dialect.name == "mysql":
             # Prevent "Out of sort memory" caused by large values in cls.data 
column for MySQL.
             # Details in https://github.com/apache/airflow/pull/55589
             latest_item_id = (
-                select(cls.id).where(cls.dag_id == 
dag_id).order_by(cls.created_at.desc()).limit(1)
+                select(cls.id)
+                .join(DagVersion, cls.dag_version_id == DagVersion.id)
+                .where(cls.dag_id == dag_id)
+                .order_by(DagVersion.version_number.desc())
+                .limit(1)
             )
             return select(cls).where(cls.id == latest_item_id)
-        return select(cls).where(cls.dag_id == 
dag_id).order_by(cls.created_at.desc()).limit(1)
+        return (
+            select(cls)
+            .join(DagVersion, cls.dag_version_id == DagVersion.id)
+            .where(cls.dag_id == dag_id)
+            .order_by(DagVersion.version_number.desc())
+            .limit(1)
+        )
+
+    @classmethod
+    def _latest_by_version_select(cls, dag_ids: list[str] | None = None) -> 
Select:
+        """
+        Select the serialized DAG with the highest ``version_number`` per 
dag_id.
+
+        Ordering by ``version_number`` (monotonic and unique per dag_id) is 
deterministic and,
+        unlike ``max(created_at)``, never returns two rows for a dag_id when 
versions share a
+        ``created_at`` timestamp.
+
+        :param dag_ids: If given, restrict to these dag_ids; otherwise cover 
all dags.
+        """
+        max_version_query = select(
+            DagVersion.dag_id, 
func.max(DagVersion.version_number).label("max_version")
+        ).join(cls, cls.dag_version_id == DagVersion.id)
+        if dag_ids is not None:
+            max_version_query = 
max_version_query.where(DagVersion.dag_id.in_(dag_ids))
+        max_version_subquery = 
max_version_query.group_by(DagVersion.dag_id).subquery()
+
+        return (
+            select(cls)
+            .join(DagVersion, cls.dag_version_id == DagVersion.id)
+            .join(
+                max_version_subquery,
+                (DagVersion.dag_id == max_version_subquery.c.dag_id)
+                & (DagVersion.version_number == 
max_version_subquery.c.max_version),
+            )
+        )
 
     @classmethod
     @provide_session
@@ -801,21 +845,7 @@ class SerializedDagModel(Base):
         :param session: The database session.
         :return: The latest serialized dag of the DAGs.
         """
-        # Subquery to get the latest serdag per dag_id
-        latest_serdag_subquery = (
-            select(cls.dag_id, func.max(cls.created_at).label("created_at"))
-            .where(cls.dag_id.in_(dag_ids))
-            .group_by(cls.dag_id)
-            .subquery()
-        )
-        latest_serdags = session.scalars(
-            select(cls)
-            .join(
-                latest_serdag_subquery,
-                cls.created_at == latest_serdag_subquery.c.created_at,
-            )
-            .where(cls.dag_id.in_(dag_ids))
-        ).all()
+        latest_serdags = 
session.scalars(cls._latest_by_version_select(dag_ids)).all()
         return latest_serdags or []
 
     @classmethod
@@ -827,16 +857,7 @@ class SerializedDagModel(Base):
         :param session: ORM Session
         :returns: a dict of DAGs read from database
         """
-        latest_serialized_dag_subquery = (
-            select(cls.dag_id, 
func.max(cls.created_at).label("max_created")).group_by(cls.dag_id).subquery()
-        )
-        serialized_dags = session.scalars(
-            select(cls).join(
-                latest_serialized_dag_subquery,
-                (cls.dag_id == latest_serialized_dag_subquery.c.dag_id)
-                and (cls.created_at == 
latest_serialized_dag_subquery.c.max_created),
-            )
-        )
+        serialized_dags = session.scalars(cls._latest_by_version_select())
 
         dags = {}
         for row in serialized_dags:
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index 8700e7ce96d..d55b2df3625 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -28,6 +28,7 @@ from sqlalchemy.orm import Session
 
 from airflow._shared.timezones import timezone
 from airflow.models.dag import DagModel
+from airflow.models.dag_version import DagVersion
 from airflow.models.dagbag import DBDagBag
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -643,15 +644,17 @@ class TestGetGridDataEndpoint:
         assert _strip_dag_version_ids(response.json()) == [GRID_RUN_1, 
GRID_RUN_2]
 
     def test_get_grid_runs_multiple_dag_versions(self, session, test_client):
-        latest_dag_version = 
session.scalar(select(DagModel).where(DagModel.dag_id == 
DAG_ID_5)).dag_versions[
-            -1
-        ]
-        latest_task_instance = session.scalar(
+        # run_5_2 is created after version 2 exists, so its task instances run 
on version 2.
+        # Reassign one of them to version 1 so the run spans two versions.
+        first_dag_version = session.scalar(
+            select(DagVersion).where(DagVersion.dag_id == DAG_ID_5, 
DagVersion.version_number == 1)
+        )
+        task_instance = session.scalar(
             select(TaskInstance)
             .where(TaskInstance.dag_id == DAG_ID_5, TaskInstance.run_id == 
"run_5_2")
             .limit(1)
         )
-        latest_task_instance.dag_version = latest_dag_version
+        task_instance.dag_version = first_dag_version
         session.commit()
 
         response = test_client.get(f"/grid/runs/{DAG_ID_5}?limit=5")
diff --git a/airflow-core/tests/unit/models/test_dag_version.py 
b/airflow-core/tests/unit/models/test_dag_version.py
index e7899422093..ec600c7be66 100644
--- a/airflow-core/tests/unit/models/test_dag_version.py
+++ b/airflow-core/tests/unit/models/test_dag_version.py
@@ -16,14 +16,19 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import timedelta
+
 import pytest
 from sqlalchemy import func, select
 
+from airflow._shared.timezones import timezone
+from airflow.models.dag import DagModel
 from airflow.models.dag_version import DagVersion
+from airflow.models.dagbundle import DagBundleModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 
 from tests_common.test_utils.dag import sync_dag_to_db
-from tests_common.test_utils.db import clear_db_dags
+from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags
 
 pytestmark = pytest.mark.db_test
 
@@ -31,9 +36,12 @@ pytestmark = pytest.mark.db_test
 class TestDagVersion:
     def setup_method(self):
         clear_db_dags()
+        clear_db_dag_bundles()
 
     def teardown_method(self):
+        # clear_db_dags() first: DagModel.bundle_name has an FK to dag_bundle.
         clear_db_dags()
+        clear_db_dag_bundles()
 
     @pytest.mark.need_serialized_dag
     def test_writing_dag_version(self, dag_maker, session):
@@ -59,6 +67,57 @@ class TestDagVersion:
         assert latest_version.version_number == 2
         assert session.scalar(select(func.count()).where(DagVersion.dag_id == 
dag.dag_id)) == 2
 
+    @staticmethod
+    def _seed_two_versions_with_inverted_created_at(session, *, dag_id):
+        """Create versions 1 and 2 where version 2 has an *earlier* created_at 
than version 1.
+
+        This makes created_at ordering disagree with version_number ordering, 
modelling the
+        timestamp tie / clock-skew case the ordering must be robust to. 
Returns the bundle name.
+        """
+        bundle_name = f"bundle-{dag_id}"
+        session.add(DagBundleModel(name=bundle_name))
+        session.flush()
+        session.add(DagModel(dag_id=dag_id, bundle_name=bundle_name))
+        session.flush()
+
+        base = timezone.utcnow()
+        for version_number, created_at in ((1, base), (2, base - 
timedelta(minutes=1))):
+            session.add(
+                DagVersion(
+                    dag_id=dag_id,
+                    version_number=version_number,
+                    bundle_name=bundle_name,
+                    created_at=created_at,
+                    last_updated=created_at,
+                )
+            )
+        session.commit()
+        return bundle_name
+
+    def test_latest_version_uses_version_number_not_created_at(self, session):
+        """The latest version is the one with the highest version_number, not 
the latest created_at."""
+        dag_id = "test_latest_ordering"
+        self._seed_two_versions_with_inverted_created_at(session, 
dag_id=dag_id)
+
+        assert DagVersion.get_latest_version(dag_id, 
session=session).version_number == 2
+        assert DagVersion.get_version(dag_id, session=session).version_number 
== 2
+
+    def test_write_dag_increments_from_max_version_number(self, session):
+        """write_dag must increment from the max version_number, not the 
latest-created row.
+
+        Otherwise, when created_at ordering disagrees with version_number 
ordering, it would
+        recompute an already-used version_number and violate the (dag_id, 
version_number) unique
+        constraint.
+        """
+        dag_id = "test_write_dag_increment"
+        bundle_name = 
self._seed_two_versions_with_inverted_created_at(session, dag_id=dag_id)
+
+        new_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+        session.commit()
+
+        assert new_version.version_number == 3
+        assert session.scalar(select(func.count()).where(DagVersion.dag_id == 
dag_id)) == 3
+
     @pytest.mark.need_serialized_dag
     def test_get_version(self, dag_maker, session):
         """The two dags have the same version name and number but different 
dag ids"""
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 5fa7d155f8e..a1ad63de5d6 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -348,6 +348,49 @@ class TestSerializedDagModel:
         latest_versions = SDM.get_latest_serialized_dags(dag_ids=["dag1", 
"dag2"], session=session)
         assert len(latest_versions) == 2
 
+    def _seed_two_versions(self, dag_maker, session, dag_id):
+        """Create two serialized versions of ``dag_id`` (version 1 has task 
instances)."""
+        with dag_maker(dag_id) as dag:
+            EmptyOperator(task_id="task1")
+        sync_dag_to_db(dag, session=session)
+        dag_maker.create_dagrun()
+        with dag_maker(dag_id) as dag:
+            EmptyOperator(task_id="task1")
+            EmptyOperator(task_id="task2")
+        sync_dag_to_db(dag, session=session)
+
+    def 
test_get_latest_serialized_dags_returns_one_row_per_dag_under_created_at_tie(
+        self, dag_maker, session
+    ):
+        """A created_at tie must not yield duplicate rows; the max 
version_number wins."""
+        self._seed_two_versions(dag_maker, session, "tie_dag")
+        # Force both serialized_dag rows to share a created_at (e.g. frozen 
clock / same second).
+        session.execute(
+            update(SDM).where(SDM.dag_id == 
"tie_dag").values(created_at=pendulum.datetime(2025, 1, 1))
+        )
+        session.commit()
+
+        latest = SDM.get_latest_serialized_dags(dag_ids=["tie_dag"], 
session=session)
+
+        assert len(latest) == 1
+        assert latest[0].dag_version.version_number == 2
+
+    def 
test_get_returns_latest_version_when_created_at_ordering_disagrees(self, 
dag_maker, session):
+        """SDM.get (latest_item_select_object) must pick the max 
version_number, not max created_at."""
+        self._seed_two_versions(dag_maker, session, "tie_dag2")
+        v1 = DagVersion.get_version("tie_dag2", 1, session=session)
+        v2 = DagVersion.get_version("tie_dag2", 2, session=session)
+        # Invert created_at: version 1's serdag looks "newer" than version 2's.
+        session.execute(
+            update(SDM).where(SDM.dag_version_id == 
v1.id).values(created_at=pendulum.datetime(2025, 1, 2))
+        )
+        session.execute(
+            update(SDM).where(SDM.dag_version_id == 
v2.id).values(created_at=pendulum.datetime(2025, 1, 1))
+        )
+        session.commit()
+
+        assert SDM.get("tie_dag2", session=session).dag_version.version_number 
== 2
+
     def test_new_dag_versions_are_not_created_if_no_dagruns(self, dag_maker, 
session):
         with dag_maker("dag1") as dag:
             PythonOperator(task_id="task1", python_callable=lambda: None)

Reply via email to