This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 774b5b50e96 Determine latest Dag version by version_number, not
created_at (#68389)
774b5b50e96 is described below
commit 774b5b50e962304ac838d4ce70504cf2eb44285d
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jun 15 12:36:25 2026 +0100
Determine latest Dag version by version_number, not created_at (#68389)
* Determine latest Dag version by version_number, not created_at
The latest DagVersion was selected by created_at DESC, which
is not deterministic when two versions share a timestamp (or under clock
skew).
write_dag derives the next version_number from that row, so picking a
non-max
row collided with the (dag_id, version_number) unique constraint.
Order by the monotonic, unique version_number instead, consistently
across get_latest_version/write_dag, get_version, and the bulk prefetch.
* Fix grid multi-version test relying on created_at tie-breaking
* Select latest serialized DAG by version_number, not created_at
Extends the version_number-ordering fix to the remaining
latest-serialized-DAG
selectors: latest_item_select_object (SerializedDagModel.get, backfill),
get_latest_serialized_dags (scheduler, partitions) and read_all_dags.
Besides
the created_at tie nondeterminism, get_latest_serialized_dags/read_all_dags
joined on created_at == max(created_at), which returned duplicate rows per
dag_id under a tie; matching on the unique version_number fixes that (and
corrects read_all_dags' Python 'and' join condition).
* Extract latest-serialized-DAG-by-version_number select into a helper
Dedupes the identical 'max version_number per dag_id' subquery used by
get_latest_serialized_dags and read_all_dags into _latest_by_version_select.
---
airflow-core/src/airflow/models/dag_version.py | 9 ++-
airflow-core/src/airflow/models/serialized_dag.py | 87 ++++++++++++++--------
.../api_fastapi/core_api/routes/ui/test_grid.py | 13 ++--
airflow-core/tests/unit/models/test_dag_version.py | 61 ++++++++++++++-
.../tests/unit/models/test_serialized_dag.py | 43 +++++++++++
5 files changed, 172 insertions(+), 41 deletions(-)
diff --git a/airflow-core/src/airflow/models/dag_version.py
b/airflow-core/src/airflow/models/dag_version.py
index 45ea40c091b..e6564a6da06 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -171,7 +171,12 @@ class DagVersion(Base):
if load_bundle_model:
query = query.options(joinedload(cls.bundle))
- query = query.order_by(cls.created_at.desc()).limit(1)
+ # Order by version_number, not created_at: version_number is monotonic
and unique per
+ # dag_id, so it is deterministic even when two versions share a
created_at timestamp.
+ # write_dag relies on this select to compute the next version_number;
ordering by
+ # created_at could pick a non-max row under a tie and collide with the
+ # (dag_id, version_number) unique constraint.
+ query = query.order_by(cls.version_number.desc()).limit(1)
return query
@classmethod
@@ -224,7 +229,7 @@ class DagVersion(Base):
if version_number:
version_select_obj = version_select_obj.where(cls.version_number
== version_number)
- return
session.scalar(version_select_obj.order_by(cls.id.desc()).limit(1))
+ return
session.scalar(version_select_obj.order_by(cls.version_number.desc()).limit(1))
@property
def version(self) -> str:
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index a27cad68908..ce033365311 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -58,6 +58,7 @@ from airflow.utils.sqlalchemy import UtcDateTime,
get_dialect_name
if TYPE_CHECKING:
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import InstrumentedAttribute
+ from sqlalchemy.sql import Select
from sqlalchemy.sql.elements import ColumnElement
from airflow.serialization.definitions.dag import SerializedDAG
@@ -544,15 +545,18 @@ class SerializedDagModel(Base):
if not dag_id_list:
return {}
- # Fetch latest serialized_dag (last_updated, dag_hash) per dag_id
- # using a window function to pick the most recent row.
+ # Fetch the serialized_dag (last_updated, dag_hash) of the latest
DagVersion per dag_id,
+ # ordering by version_number so it stays consistent with the
DagVersion picked by dv_subq.
sd_subq = (
select(
cls.dag_id.label("dag_id"),
cls.last_updated.label("last_updated"),
cls.dag_hash.label("dag_hash"),
- func.row_number().over(partition_by=cls.dag_id,
order_by=cls.created_at.desc()).label("rn"),
+ func.row_number()
+ .over(partition_by=cls.dag_id,
order_by=DagVersion.version_number.desc())
+ .label("rn"),
)
+ .join(DagVersion, cls.dag_version_id == DagVersion.id)
.where(cls.dag_id.in_(dag_id_list))
.subquery()
)
@@ -563,14 +567,13 @@ class SerializedDagModel(Base):
row.dag_id: (row.last_updated, row.dag_hash) for row in sd_rows
}
- # Fetch latest DagVersion per dag_id using a window function,
- # matching the original write_dag ordering (ORDER BY created_at DESC).
+ # Fetch latest DagVersion per dag_id, ordering by version_number to
match write_dag.
dv_subq = (
select(
DagVersion.id.label("id"),
DagVersion.dag_id.label("dag_id"),
func.row_number()
- .over(partition_by=DagVersion.dag_id,
order_by=DagVersion.created_at.desc())
+ .over(partition_by=DagVersion.dag_id,
order_by=DagVersion.version_number.desc())
.label("rn"),
)
.where(DagVersion.dag_id.in_(dag_id_list))
@@ -780,14 +783,55 @@ class SerializedDagModel(Base):
def latest_item_select_object(cls, dag_id):
from airflow.settings import engine
+ # Order by the version's version_number (monotonic, unique per
dag_id), not created_at,
+ # so the latest serialized DAG is picked deterministically even when
two versions share a
+ # created_at timestamp.
if engine.dialect.name == "mysql":
# Prevent "Out of sort memory" caused by large values in cls.data
column for MySQL.
# Details in https://github.com/apache/airflow/pull/55589
latest_item_id = (
- select(cls.id).where(cls.dag_id ==
dag_id).order_by(cls.created_at.desc()).limit(1)
+ select(cls.id)
+ .join(DagVersion, cls.dag_version_id == DagVersion.id)
+ .where(cls.dag_id == dag_id)
+ .order_by(DagVersion.version_number.desc())
+ .limit(1)
)
return select(cls).where(cls.id == latest_item_id)
- return select(cls).where(cls.dag_id ==
dag_id).order_by(cls.created_at.desc()).limit(1)
+ return (
+ select(cls)
+ .join(DagVersion, cls.dag_version_id == DagVersion.id)
+ .where(cls.dag_id == dag_id)
+ .order_by(DagVersion.version_number.desc())
+ .limit(1)
+ )
+
+ @classmethod
+ def _latest_by_version_select(cls, dag_ids: list[str] | None = None) ->
Select:
+ """
+ Select the serialized DAG with the highest ``version_number`` per
dag_id.
+
+ Ordering by ``version_number`` (monotonic and unique per dag_id) is
deterministic and,
+ unlike ``max(created_at)``, never returns two rows for a dag_id when
versions share a
+ ``created_at`` timestamp.
+
+ :param dag_ids: If given, restrict to these dag_ids; otherwise cover
all dags.
+ """
+ max_version_query = select(
+ DagVersion.dag_id,
func.max(DagVersion.version_number).label("max_version")
+ ).join(cls, cls.dag_version_id == DagVersion.id)
+ if dag_ids is not None:
+ max_version_query =
max_version_query.where(DagVersion.dag_id.in_(dag_ids))
+ max_version_subquery =
max_version_query.group_by(DagVersion.dag_id).subquery()
+
+ return (
+ select(cls)
+ .join(DagVersion, cls.dag_version_id == DagVersion.id)
+ .join(
+ max_version_subquery,
+ (DagVersion.dag_id == max_version_subquery.c.dag_id)
+ & (DagVersion.version_number ==
max_version_subquery.c.max_version),
+ )
+ )
@classmethod
@provide_session
@@ -801,21 +845,7 @@ class SerializedDagModel(Base):
:param session: The database session.
:return: The latest serialized dag of the DAGs.
"""
- # Subquery to get the latest serdag per dag_id
- latest_serdag_subquery = (
- select(cls.dag_id, func.max(cls.created_at).label("created_at"))
- .where(cls.dag_id.in_(dag_ids))
- .group_by(cls.dag_id)
- .subquery()
- )
- latest_serdags = session.scalars(
- select(cls)
- .join(
- latest_serdag_subquery,
- cls.created_at == latest_serdag_subquery.c.created_at,
- )
- .where(cls.dag_id.in_(dag_ids))
- ).all()
+ latest_serdags =
session.scalars(cls._latest_by_version_select(dag_ids)).all()
return latest_serdags or []
@classmethod
@@ -827,16 +857,7 @@ class SerializedDagModel(Base):
:param session: ORM Session
:returns: a dict of DAGs read from database
"""
- latest_serialized_dag_subquery = (
- select(cls.dag_id,
func.max(cls.created_at).label("max_created")).group_by(cls.dag_id).subquery()
- )
- serialized_dags = session.scalars(
- select(cls).join(
- latest_serialized_dag_subquery,
- (cls.dag_id == latest_serialized_dag_subquery.c.dag_id)
- and (cls.created_at ==
latest_serialized_dag_subquery.c.max_created),
- )
- )
+ serialized_dags = session.scalars(cls._latest_by_version_select())
dags = {}
for row in serialized_dags:
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index 8700e7ce96d..d55b2df3625 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -28,6 +28,7 @@ from sqlalchemy.orm import Session
from airflow._shared.timezones import timezone
from airflow.models.dag import DagModel
+from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DBDagBag
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -643,15 +644,17 @@ class TestGetGridDataEndpoint:
assert _strip_dag_version_ids(response.json()) == [GRID_RUN_1,
GRID_RUN_2]
def test_get_grid_runs_multiple_dag_versions(self, session, test_client):
- latest_dag_version =
session.scalar(select(DagModel).where(DagModel.dag_id ==
DAG_ID_5)).dag_versions[
- -1
- ]
- latest_task_instance = session.scalar(
+ # run_5_2 is created after version 2 exists, so its task instances run
on version 2.
+ # Reassign one of them to version 1 so the run spans two versions.
+ first_dag_version = session.scalar(
+ select(DagVersion).where(DagVersion.dag_id == DAG_ID_5,
DagVersion.version_number == 1)
+ )
+ task_instance = session.scalar(
select(TaskInstance)
.where(TaskInstance.dag_id == DAG_ID_5, TaskInstance.run_id ==
"run_5_2")
.limit(1)
)
- latest_task_instance.dag_version = latest_dag_version
+ task_instance.dag_version = first_dag_version
session.commit()
response = test_client.get(f"/grid/runs/{DAG_ID_5}?limit=5")
diff --git a/airflow-core/tests/unit/models/test_dag_version.py
b/airflow-core/tests/unit/models/test_dag_version.py
index e7899422093..ec600c7be66 100644
--- a/airflow-core/tests/unit/models/test_dag_version.py
+++ b/airflow-core/tests/unit/models/test_dag_version.py
@@ -16,14 +16,19 @@
# under the License.
from __future__ import annotations
+from datetime import timedelta
+
import pytest
from sqlalchemy import func, select
+from airflow._shared.timezones import timezone
+from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
+from airflow.models.dagbundle import DagBundleModel
from airflow.providers.standard.operators.empty import EmptyOperator
from tests_common.test_utils.dag import sync_dag_to_db
-from tests_common.test_utils.db import clear_db_dags
+from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags
pytestmark = pytest.mark.db_test
@@ -31,9 +36,12 @@ pytestmark = pytest.mark.db_test
class TestDagVersion:
def setup_method(self):
clear_db_dags()
+ clear_db_dag_bundles()
def teardown_method(self):
+ # clear_db_dags() first: DagModel.bundle_name has an FK to dag_bundle.
clear_db_dags()
+ clear_db_dag_bundles()
@pytest.mark.need_serialized_dag
def test_writing_dag_version(self, dag_maker, session):
@@ -59,6 +67,57 @@ class TestDagVersion:
assert latest_version.version_number == 2
assert session.scalar(select(func.count()).where(DagVersion.dag_id ==
dag.dag_id)) == 2
+ @staticmethod
+ def _seed_two_versions_with_inverted_created_at(session, *, dag_id):
+ """Create versions 1 and 2 where version 2 has an *earlier* created_at
than version 1.
+
+ This makes created_at ordering disagree with version_number ordering,
modelling the
+ timestamp tie / clock-skew case the ordering must be robust to.
Returns the bundle name.
+ """
+ bundle_name = f"bundle-{dag_id}"
+ session.add(DagBundleModel(name=bundle_name))
+ session.flush()
+ session.add(DagModel(dag_id=dag_id, bundle_name=bundle_name))
+ session.flush()
+
+ base = timezone.utcnow()
+ for version_number, created_at in ((1, base), (2, base -
timedelta(minutes=1))):
+ session.add(
+ DagVersion(
+ dag_id=dag_id,
+ version_number=version_number,
+ bundle_name=bundle_name,
+ created_at=created_at,
+ last_updated=created_at,
+ )
+ )
+ session.commit()
+ return bundle_name
+
+ def test_latest_version_uses_version_number_not_created_at(self, session):
+ """The latest version is the one with the highest version_number, not
the latest created_at."""
+ dag_id = "test_latest_ordering"
+ self._seed_two_versions_with_inverted_created_at(session,
dag_id=dag_id)
+
+ assert DagVersion.get_latest_version(dag_id,
session=session).version_number == 2
+ assert DagVersion.get_version(dag_id, session=session).version_number
== 2
+
+ def test_write_dag_increments_from_max_version_number(self, session):
+ """write_dag must increment from the max version_number, not the
latest-created row.
+
+ Otherwise, when created_at ordering disagrees with version_number
ordering, it would
+ recompute an already-used version_number and violate the (dag_id,
version_number) unique
+ constraint.
+ """
+ dag_id = "test_write_dag_increment"
+ bundle_name =
self._seed_two_versions_with_inverted_created_at(session, dag_id=dag_id)
+
+ new_version = DagVersion.write_dag(dag_id=dag_id,
bundle_name=bundle_name, session=session)
+ session.commit()
+
+ assert new_version.version_number == 3
+ assert session.scalar(select(func.count()).where(DagVersion.dag_id ==
dag_id)) == 3
+
@pytest.mark.need_serialized_dag
def test_get_version(self, dag_maker, session):
"""The two dags have the same version name and number but different
dag ids"""
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 5fa7d155f8e..a1ad63de5d6 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -348,6 +348,49 @@ class TestSerializedDagModel:
latest_versions = SDM.get_latest_serialized_dags(dag_ids=["dag1",
"dag2"], session=session)
assert len(latest_versions) == 2
+ def _seed_two_versions(self, dag_maker, session, dag_id):
+ """Create two serialized versions of ``dag_id`` (version 1 has task
instances)."""
+ with dag_maker(dag_id) as dag:
+ EmptyOperator(task_id="task1")
+ sync_dag_to_db(dag, session=session)
+ dag_maker.create_dagrun()
+ with dag_maker(dag_id) as dag:
+ EmptyOperator(task_id="task1")
+ EmptyOperator(task_id="task2")
+ sync_dag_to_db(dag, session=session)
+
+ def
test_get_latest_serialized_dags_returns_one_row_per_dag_under_created_at_tie(
+ self, dag_maker, session
+ ):
+ """A created_at tie must not yield duplicate rows; the max
version_number wins."""
+ self._seed_two_versions(dag_maker, session, "tie_dag")
+ # Force both serialized_dag rows to share a created_at (e.g. frozen
clock / same second).
+ session.execute(
+ update(SDM).where(SDM.dag_id ==
"tie_dag").values(created_at=pendulum.datetime(2025, 1, 1))
+ )
+ session.commit()
+
+ latest = SDM.get_latest_serialized_dags(dag_ids=["tie_dag"],
session=session)
+
+ assert len(latest) == 1
+ assert latest[0].dag_version.version_number == 2
+
+ def
test_get_returns_latest_version_when_created_at_ordering_disagrees(self,
dag_maker, session):
+ """SDM.get (latest_item_select_object) must pick the max
version_number, not max created_at."""
+ self._seed_two_versions(dag_maker, session, "tie_dag2")
+ v1 = DagVersion.get_version("tie_dag2", 1, session=session)
+ v2 = DagVersion.get_version("tie_dag2", 2, session=session)
+ # Invert created_at: version 1's serdag looks "newer" than version 2's.
+ session.execute(
+ update(SDM).where(SDM.dag_version_id ==
v1.id).values(created_at=pendulum.datetime(2025, 1, 2))
+ )
+ session.execute(
+ update(SDM).where(SDM.dag_version_id ==
v2.id).values(created_at=pendulum.datetime(2025, 1, 1))
+ )
+ session.commit()
+
+ assert SDM.get("tie_dag2", session=session).dag_version.version_number
== 2
+
def test_new_dag_versions_are_not_created_if_no_dagruns(self, dag_maker,
session):
with dag_maker("dag1") as dag:
PythonOperator(task_id="task1", python_callable=lambda: None)