This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eeb203ee932 Update SerializedDagModel query to fetch DagVersion with
joinedload (#56422)
eeb203ee932 is described below
commit eeb203ee932672c7a8225cb8bba32fd3e4da408c
Author: anshuksi282-ksolves <[email protected]>
AuthorDate: Mon Oct 27 15:28:45 2025 +0530
Update SerializedDagModel query to fetch DagVersion with joinedload (#56422)
---
airflow-core/src/airflow/models/serialized_dag.py | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index e5df49e2a95..c584ab27d92 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -29,7 +29,7 @@ import sqlalchemy_jsonfield
import uuid6
from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_
from sqlalchemy.dialects.postgresql import JSONB
-from sqlalchemy.orm import Mapped, backref, foreign, relationship
+from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType
@@ -414,7 +414,14 @@ class SerializedDagModel(Base):
serialized_dag_hash = session.scalars(
select(cls.dag_hash).where(cls.dag_id ==
dag.dag_id).order_by(cls.created_at.desc())
).first()
- dag_version = DagVersion.get_latest_version(dag.dag_id,
session=session)
+ dag_version = session.scalar(
+ select(DagVersion)
+ .where(DagVersion.dag_id == dag.dag_id)
+ .options(joinedload(DagVersion.task_instances))
+ .options(joinedload(DagVersion.serialized_dag))
+ .order_by(DagVersion.created_at.desc())
+ .limit(1)
+ )
if (
serialized_dag_hash == new_serialized_dag.dag_hash
@@ -429,8 +436,8 @@ class SerializedDagModel(Base):
# the serialized dag, the dag_version and the dag_code instead of
a new version
# if the dag_version is not associated with any task instances
latest_ser_dag = cls.get(dag.dag_id, session=session)
- if TYPE_CHECKING:
- assert latest_ser_dag is not None
+ if not latest_ser_dag:
+ return False
# Update the serialized DAG with the new_serialized_dag
latest_ser_dag._data = new_serialized_dag._data
latest_ser_dag._data_compressed =
new_serialized_dag._data_compressed