This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cc8aa7bc42e AIP-65: Update dag source endpoint to support versioning 
(#43492)
cc8aa7bc42e is described below

commit cc8aa7bc42e5ec9d1766e4b58297ce6f896de7ce
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Nov 19 12:26:07 2024 +0100

    AIP-65: Update dag source endpoint to support versioning (#43492)
    
    * AIP-65: Update dag source endpoint to support versioning
    
    Enhanced the DAG source endpoint to support version-based retrieval
    
    Refactored the get_dag_source function to allow fetching specific versions 
of DAG source code using dag_id, version_name, and version_number parameters.
    
    Replaced file_token with dag_id in endpoint paths and removed unnecessary 
token-based access.
    
    Updated OpenAPI specifications and requested serializers to include new 
versioning parameters.
    
    Modified API response schema to include dag_id, version_name, and 
version_number for improved version tracking.
    
    Added/updated tests
    
    * Remove version_name
    
    * fixup! Remove version_name
    
    * fixup! fixup! Remove version_name
    
    * Fix test in fab provider
    
    * fix conflicts
    
    * Remove async def
    
    * fix conflicts
---
 .../api_connexion/endpoints/dag_source_endpoint.py |  58 ++--
 airflow/api_connexion/openapi/v1.yaml              |  16 +-
 airflow/api_connexion/schemas/dag_source_schema.py |   2 +
 .../api_fastapi/core_api/datamodels/dag_sources.py |   2 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml |  24 +-
 .../core_api/routes/public/dag_sources.py          |  25 +-
 .../versions/0047_3_0_0_add_dag_versioning.py      |   1 -
 airflow/models/dag.py                              |   1 -
 airflow/models/dag_version.py                      |  23 +-
 airflow/models/serialized_dag.py                   |   1 -
 airflow/serialization/schema.json                  |   1 -
 airflow/ui/openapi-gen/queries/common.ts           |   8 +-
 airflow/ui/openapi-gen/queries/prefetch.ts         |  15 +-
 airflow/ui/openapi-gen/queries/queries.ts          |  13 +-
 airflow/ui/openapi-gen/queries/suspense.ts         |  13 +-
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  17 +-
 airflow/ui/openapi-gen/requests/services.gen.ts    |  10 +-
 airflow/ui/openapi-gen/requests/types.gen.ts       |   7 +-
 airflow/ui/src/pages/DagsList/Dag/Code/Code.tsx    |  10 +-
 airflow/www/static/js/api/useDagCode.ts            |   8 +-
 airflow/www/static/js/types/api-generated.ts       |  27 +-
 airflow/www/templates/airflow/dag.html             |   2 +-
 docs/apache-airflow/img/airflow_erd.sha256         |   2 +-
 docs/apache-airflow/img/airflow_erd.svg            | 328 ++++++++++-----------
 .../api_endpoints/test_dag_source_endpoint.py      |  16 +-
 task_sdk/src/airflow/sdk/definitions/dag.py        |   6 -
 .../endpoints/test_dag_source_endpoint.py          | 122 ++++----
 .../core_api/routes/public/test_dag_sources.py     | 101 ++++---
 tests/models/test_dag_version.py                   |  37 +--
 tests/models/test_serialized_dag.py                |   2 +-
 tests_common/pytest_plugin.py                      |   1 -
 31 files changed, 480 insertions(+), 419 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_source_endpoint.py 
b/airflow/api_connexion/endpoints/dag_source_endpoint.py
index 9a3285884e7..e53ffe89c73 100644
--- a/airflow/api_connexion/endpoints/dag_source_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_source_endpoint.py
@@ -19,15 +19,15 @@ from __future__ import annotations
 from http import HTTPStatus
 from typing import TYPE_CHECKING, Sequence
 
-from flask import Response, current_app, request
-from itsdangerous import BadSignature, URLSafeSerializer
+from flask import Response, request
+from sqlalchemy import select
 
 from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import NotFound, PermissionDenied
 from airflow.api_connexion.schemas.dag_source_schema import dag_source_schema
 from airflow.auth.managers.models.resource_details import DagAccessEntity, 
DagDetails
 from airflow.models.dag import DagModel
-from airflow.models.dagcode import DagCode
+from airflow.models.dag_version import DagVersion
 from airflow.utils.api_migration import mark_fastapi_migration_done
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.www.extensions.init_auth_manager import get_auth_manager
@@ -41,32 +41,42 @@ if TYPE_CHECKING:
 @mark_fastapi_migration_done
 @security.requires_access_dag("GET", DagAccessEntity.CODE)
 @provide_session
-def get_dag_source(*, file_token: str, session: Session = NEW_SESSION) -> 
Response:
-    """Get source code using file token."""
-    secret_key = current_app.config["SECRET_KEY"]
-    auth_s = URLSafeSerializer(secret_key)
-    try:
-        path = auth_s.loads(file_token)
-        dag_ids = session.query(DagModel.dag_id).filter(DagModel.fileloc == 
path).all()
-        requests: Sequence[IsAuthorizedDagRequest] = [
-            {
-                "method": "GET",
-                "details": DagDetails(id=dag_id[0]),
-            }
-            for dag_id in dag_ids
-        ]
+def get_dag_source(
+    *,
+    dag_id: str,
+    version_number: int | None = None,
+    session: Session = NEW_SESSION,
+) -> Response:
+    """Get source code from DagCode."""
+    dag_version = DagVersion.get_version(dag_id, version_number, 
session=session)
+    if not dag_version:
+        raise NotFound(f"The source code of the DAG {dag_id}, version_number 
{version_number} was not found")
+    path = dag_version.dag_code.fileloc
+    dag_ids = session.scalars(select(DagModel.dag_id).where(DagModel.fileloc 
== path)).all()
+    requests: Sequence[IsAuthorizedDagRequest] = [
+        {
+            "method": "GET",
+            "details": DagDetails(id=dag_id[0]),
+        }
+        for dag_id in dag_ids
+    ]
 
-        # Check if user has read access to all the DAGs defined in the file
-        if not get_auth_manager().batch_is_authorized_dag(requests):
-            raise PermissionDenied()
-        dag_source = DagCode.code(path, session=session)
-    except (BadSignature, FileNotFoundError):
-        raise NotFound("Dag source not found")
+    # Check if user has read access to all the DAGs defined in the file
+    if not get_auth_manager().batch_is_authorized_dag(requests):
+        raise PermissionDenied()
+    dag_source = dag_version.dag_code.source_code
+    version_number = dag_version.version_number
 
     return_type = request.accept_mimetypes.best_match(["text/plain", 
"application/json"])
     if return_type == "text/plain":
         return Response(dag_source, headers={"Content-Type": return_type})
     if return_type == "application/json":
-        content = dag_source_schema.dumps({"content": dag_source})
+        content = dag_source_schema.dumps(
+            {
+                "content": dag_source,
+                "dag_id": dag_id,
+                "version_number": version_number,
+            }
+        )
         return Response(content, headers={"Content-Type": return_type})
     return Response("Not Allowed Accept Header", 
status=HTTPStatus.NOT_ACCEPTABLE)
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index 244349fb946..fcf41462d23 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2263,10 +2263,9 @@ paths:
         "403":
           $ref: "#/components/responses/PermissionDenied"
 
-  /dagSources/{file_token}:
+  /dagSources/{dag_id}:
     parameters:
-      - $ref: "#/components/parameters/FileToken"
-
+      - $ref: "#/components/parameters/DAGID"
     get:
       summary: Get a source code
       description: >
@@ -2274,6 +2273,8 @@ paths:
       x-openapi-router-controller: 
airflow.api_connexion.endpoints.dag_source_endpoint
       operationId: get_dag_source
       tags: [DAG]
+      parameters:
+        - $ref: "#/components/parameters/VersionNumber"
       responses:
         "200":
           description: Success.
@@ -5860,6 +5861,15 @@ components:
       description: |
         List of field for return.
 
+    VersionNumber:
+      in: query
+      name: version_number
+      schema:
+        type: integer
+      description: |
+        The version number.
+
+
   # Reusable request bodies
   requestBodies: {}
 
diff --git a/airflow/api_connexion/schemas/dag_source_schema.py 
b/airflow/api_connexion/schemas/dag_source_schema.py
index adb89ce76a5..75da5904db6 100644
--- a/airflow/api_connexion/schemas/dag_source_schema.py
+++ b/airflow/api_connexion/schemas/dag_source_schema.py
@@ -23,6 +23,8 @@ class DagSourceSchema(Schema):
     """Dag Source schema."""
 
     content = fields.String(dump_only=True)
+    dag_id = fields.String(dump_only=True)
+    version_number = fields.Integer(dump_only=True)
 
 
 dag_source_schema = DagSourceSchema()
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_sources.py 
b/airflow/api_fastapi/core_api/datamodels/dag_sources.py
index 8cae02be1a8..4b84bd1e6b1 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_sources.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_sources.py
@@ -23,3 +23,5 @@ class DAGSourceResponse(BaseModel):
     """DAG Source serializer for responses."""
 
     content: str | None
+    dag_id: str
+    version_number: int | None
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index e23f83634c4..2a211f54256 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1602,7 +1602,7 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /public/dagSources/{file_token}:
+  /public/dagSources/{dag_id}:
     get:
       tags:
       - DagSource
@@ -1610,12 +1610,20 @@ paths:
       description: Get source code using file token.
       operationId: get_dag_source
       parameters:
-      - name: file_token
+      - name: dag_id
         in: path
         required: true
         schema:
           type: string
-          title: File Token
+          title: Dag Id
+      - name: version_number
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: integer
+          - type: 'null'
+          title: Version Number
       - name: accept
         in: header
         required: false
@@ -5132,9 +5140,19 @@ components:
           - type: string
           - type: 'null'
           title: Content
+        dag_id:
+          type: string
+          title: Dag Id
+        version_number:
+          anyOf:
+          - type: integer
+          - type: 'null'
+          title: Version Number
       type: object
       required:
       - content
+      - dag_id
+      - version_number
       title: DAGSourceResponse
       description: DAG Source serializer for responses.
     DAGTagCollectionResponse:
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_sources.py 
b/airflow/api_fastapi/core_api/routes/public/dag_sources.py
index c1a4c97e4c7..fbe30620228 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_sources.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_sources.py
@@ -19,14 +19,13 @@ from __future__ import annotations
 from typing import Annotated
 
 from fastapi import Depends, Header, HTTPException, Request, Response, status
-from itsdangerous import BadSignature, URLSafeSerializer
 from sqlalchemy.orm import Session
 
 from airflow.api_fastapi.common.db.common import get_session
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.dag_sources import 
DAGSourceResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
-from airflow.models.dagcode import DagCode
+from airflow.models.dag_version import DagVersion
 
 dag_sources_router = AirflowRouter(tags=["DagSource"], prefix="/dagSources")
 
@@ -36,7 +35,7 @@ mime_type_any = "*/*"
 
 
 @dag_sources_router.get(
-    "/{file_token}",
+    "/{dag_id}",
     responses={
         **create_openapi_http_exception_doc(
             [
@@ -55,21 +54,23 @@ mime_type_any = "*/*"
     response_model=DAGSourceResponse,
 )
 def get_dag_source(
-    file_token: str,
+    dag_id: str,
     session: Annotated[Session, Depends(get_session)],
     request: Request,
     accept: Annotated[str, Header()] = mime_type_any,
+    version_number: int | None = None,
 ):
     """Get source code using file token."""
-    auth_s = URLSafeSerializer(request.app.state.secret_key)
-
-    try:
-        path = auth_s.loads(file_token)
-        dag_source_model = DAGSourceResponse(
-            content=DagCode.code(path, session=session),
+    dag_version = DagVersion.get_version(dag_id, version_number, 
session=session)
+    if not dag_version:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"The source code of the DAG {dag_id}, version_number 
{version_number} was not found",
         )
-    except (BadSignature, FileNotFoundError):
-        raise HTTPException(status.HTTP_404_NOT_FOUND, "DAG source not found")
+
+    dag_source = dag_version.dag_code.source_code
+    version_number = dag_version.version_number
+    dag_source_model = DAGSourceResponse(dag_id=dag_id, content=dag_source, 
version_number=version_number)
 
     if accept.startswith(mime_type_text):
         return Response(dag_source_model.content, media_type=mime_type_text)
diff --git a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py 
b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
index 5d74507d602..e98bf5a2008 100644
--- a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
+++ b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
@@ -57,7 +57,6 @@ def upgrade():
         "dag_version",
         sa.Column("id", UUIDType(binary=False), nullable=False),
         sa.Column("version_number", sa.Integer(), nullable=False),
-        sa.Column("version_name", StringID()),
         sa.Column("dag_id", StringID(), nullable=False),
         sa.Column("created_at", UtcDateTime(), nullable=False, 
default=timezone.utcnow),
         sa.ForeignKeyConstraint(
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index ff4eac87b46..135a69dfd0f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -424,7 +424,6 @@ class DAG(TaskSDKDag, LoggingMixin):
         **Warning**: A fail stop dag can only have tasks with the default 
trigger rule ("all_success").
         An exception will be thrown if any task in a fail stop dag has a non 
default trigger rule.
     :param dag_display_name: The display name of the DAG which appears on the 
UI.
-    :param version_name: The version name to use in storing the dag to the DB.
     """
 
     partial: bool = False
diff --git a/airflow/models/dag_version.py b/airflow/models/dag_version.py
index 4bbd171b3f9..3bd8e31fcba 100644
--- a/airflow/models/dag_version.py
+++ b/airflow/models/dag_version.py
@@ -43,7 +43,6 @@ class DagVersion(Base):
     __tablename__ = "dag_version"
     id = Column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
     version_number = Column(Integer, nullable=False, default=1)
-    version_name = Column(StringID())
     dag_id = Column(StringID(), ForeignKey("dag.dag_id", ondelete="CASCADE"), 
nullable=False)
     dag_model = relationship("DagModel", back_populates="dag_versions")
     dag_code = relationship(
@@ -78,7 +77,6 @@ class DagVersion(Base):
         cls,
         *,
         dag_id: str,
-        version_name: str | None = None,
         version_number: int = 1,
         session: Session = NEW_SESSION,
     ) -> DagVersion:
@@ -88,7 +86,6 @@ class DagVersion(Base):
         Checks if a version of the DAG exists and increments the version 
number if it does.
 
         :param dag_id: The DAG ID.
-        :param version_name: The version name.
         :param version_number: The version number.
         :param session: The database session.
         :return: The DagVersion object.
@@ -102,7 +99,6 @@ class DagVersion(Base):
         dag_version = DagVersion(
             dag_id=dag_id,
             version_number=version_number,
-            version_name=version_name,
         )
         log.debug("Writing DagVersion %s to the DB", dag_version)
         session.add(dag_version)
@@ -136,7 +132,7 @@ class DagVersion(Base):
     def get_version(
         cls,
         dag_id: str,
-        version_number: int = 1,
+        version_number: int | None = None,
         *,
         session: Session = NEW_SESSION,
     ) -> DagVersion | None:
@@ -148,18 +144,13 @@ class DagVersion(Base):
         :param session: The database session.
         :return: The version of the DAG or None if not found.
         """
-        version_select_obj = (
-            select(cls)
-            .where(cls.dag_id == dag_id, cls.version_number == version_number)
-            .order_by(cls.version_number.desc())
-            .limit(1)
-        )
-        return session.scalar(version_select_obj)
+        version_select_obj = select(cls).where(cls.dag_id == dag_id)
+        if version_number:
+            version_select_obj = version_select_obj.where(cls.version_number 
== version_number)
+
+        return 
session.scalar(version_select_obj.order_by(cls.id.desc()).limit(1))
 
     @property
     def version(self) -> str:
         """A human-friendly representation of the version."""
-        name = f"{self.version_number}"
-        if self.version_name:
-            name = f"{self.version_name}-{self.version_number}"
-        return name
+        return f"{self.dag_id}-{self.version_number}"
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 37c3478c863..0f2665008fd 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -206,7 +206,6 @@ class SerializedDagModel(Base):
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to 
DB", dag.dag_id)
             return False
         dagv = DagVersion.write_dag(
-            version_name=dag.version_name,
             dag_id=dag.dag_id,
             session=session,
         )
diff --git a/airflow/serialization/schema.json 
b/airflow/serialization/schema.json
index 1e7232aa81a..1cc9c42db07 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -173,7 +173,6 @@
         },
         "dag_display_name": { "type" : "string"},
         "description": { "type" : "string"},
-          "version_name": {"type":  "string"},
         "_concurrency": { "type" : "number"},
         "max_active_tasks": { "type" : "number"},
         "max_active_runs": { "type" : "number"},
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 8b9af7821bf..7756d3f4200 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -385,15 +385,17 @@ export const useDagSourceServiceGetDagSourceKey =
 export const UseDagSourceServiceGetDagSourceKeyFn = (
   {
     accept,
-    fileToken,
+    dagId,
+    versionNumber,
   }: {
     accept?: string;
-    fileToken: string;
+    dagId: string;
+    versionNumber?: number;
   },
   queryKey?: Array<unknown>,
 ) => [
   useDagSourceServiceGetDagSourceKey,
-  ...(queryKey ?? [{ accept, fileToken }]),
+  ...(queryKey ?? [{ accept, dagId, versionNumber }]),
 ];
 export type DagStatsServiceGetDagStatsDefaultResponse = Awaited<
   ReturnType<typeof DagStatsService.getDagStats>
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 3fef2a26d11..4a87f1ad328 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -472,7 +472,8 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents 
= (
  * Get Dag Source
  * Get source code using file token.
  * @param data The data for the request.
- * @param data.fileToken
+ * @param data.dagId
+ * @param data.versionNumber
  * @param data.accept
  * @returns DAGSourceResponse Successful Response
  * @throws ApiError
@@ -481,18 +482,22 @@ export const prefetchUseDagSourceServiceGetDagSource = (
   queryClient: QueryClient,
   {
     accept,
-    fileToken,
+    dagId,
+    versionNumber,
   }: {
     accept?: string;
-    fileToken: string;
+    dagId: string;
+    versionNumber?: number;
   },
 ) =>
   queryClient.prefetchQuery({
     queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn({
       accept,
-      fileToken,
+      dagId,
+      versionNumber,
     }),
-    queryFn: () => DagSourceService.getDagSource({ accept, fileToken }),
+    queryFn: () =>
+      DagSourceService.getDagSource({ accept, dagId, versionNumber }),
   });
 /**
  * Get Dag Stats
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index b5beed39cb5..91cebaa1e71 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -593,7 +593,8 @@ export const useDagRunServiceGetUpstreamAssetEvents = <
  * Get Dag Source
  * Get source code using file token.
  * @param data The data for the request.
- * @param data.fileToken
+ * @param data.dagId
+ * @param data.versionNumber
  * @param data.accept
  * @returns DAGSourceResponse Successful Response
  * @throws ApiError
@@ -605,21 +606,23 @@ export const useDagSourceServiceGetDagSource = <
 >(
   {
     accept,
-    fileToken,
+    dagId,
+    versionNumber,
   }: {
     accept?: string;
-    fileToken: string;
+    dagId: string;
+    versionNumber?: number;
   },
   queryKey?: TQueryKey,
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useQuery<TData, TError>({
     queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn(
-      { accept, fileToken },
+      { accept, dagId, versionNumber },
       queryKey,
     ),
     queryFn: () =>
-      DagSourceService.getDagSource({ accept, fileToken }) as TData,
+      DagSourceService.getDagSource({ accept, dagId, versionNumber }) as TData,
     ...options,
   });
 /**
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index decdbdd3759..0cab9d15935 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -575,7 +575,8 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense 
= <
  * Get Dag Source
  * Get source code using file token.
  * @param data The data for the request.
- * @param data.fileToken
+ * @param data.dagId
+ * @param data.versionNumber
  * @param data.accept
  * @returns DAGSourceResponse Successful Response
  * @throws ApiError
@@ -587,21 +588,23 @@ export const useDagSourceServiceGetDagSourceSuspense = <
 >(
   {
     accept,
-    fileToken,
+    dagId,
+    versionNumber,
   }: {
     accept?: string;
-    fileToken: string;
+    dagId: string;
+    versionNumber?: number;
   },
   queryKey?: TQueryKey,
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useSuspenseQuery<TData, TError>({
     queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn(
-      { accept, fileToken },
+      { accept, dagId, versionNumber },
       queryKey,
     ),
     queryFn: () =>
-      DagSourceService.getDagSource({ accept, fileToken }) as TData,
+      DagSourceService.getDagSource({ accept, dagId, versionNumber }) as TData,
     ...options,
   });
 /**
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 1f83e434286..3cc3964b065 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1586,9 +1586,24 @@ export const $DAGSourceResponse = {
       ],
       title: "Content",
     },
+    dag_id: {
+      type: "string",
+      title: "Dag Id",
+    },
+    version_number: {
+      anyOf: [
+        {
+          type: "integer",
+        },
+        {
+          type: "null",
+        },
+      ],
+      title: "Version Number",
+    },
   },
   type: "object",
-  required: ["content"],
+  required: ["content", "dag_id", "version_number"],
   title: "DAGSourceResponse",
   description: "DAG Source serializer for responses.",
 } as const;
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index c36bdb8c92b..51f9c449e55 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -1000,7 +1000,8 @@ export class DagSourceService {
    * Get Dag Source
    * Get source code using file token.
    * @param data The data for the request.
-   * @param data.fileToken
+   * @param data.dagId
+   * @param data.versionNumber
    * @param data.accept
    * @returns DAGSourceResponse Successful Response
    * @throws ApiError
@@ -1010,13 +1011,16 @@ export class DagSourceService {
   ): CancelablePromise<GetDagSourceResponse> {
     return __request(OpenAPI, {
       method: "GET",
-      url: "/public/dagSources/{file_token}",
+      url: "/public/dagSources/{dag_id}",
       path: {
-        file_token: data.fileToken,
+        dag_id: data.dagId,
       },
       headers: {
         accept: data.accept,
       },
+      query: {
+        version_number: data.versionNumber,
+      },
       errors: {
         400: "Bad Request",
         401: "Unauthorized",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 3611df15458..723e40b0cf7 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -351,6 +351,8 @@ export type DAGRunTypes = {
  */
 export type DAGSourceResponse = {
   content: string | null;
+  dag_id: string;
+  version_number: number | null;
 };
 
 /**
@@ -1242,7 +1244,8 @@ export type ClearDagRunResponse =
 
 export type GetDagSourceData = {
   accept?: string;
-  fileToken: string;
+  dagId: string;
+  versionNumber?: number | null;
 };
 
 export type GetDagSourceResponse = DAGSourceResponse;
@@ -2335,7 +2338,7 @@ export type $OpenApiTs = {
       };
     };
   };
-  "/public/dagSources/{file_token}": {
+  "/public/dagSources/{dag_id}": {
     get: {
       req: GetDagSourceData;
       res: {
diff --git a/airflow/ui/src/pages/DagsList/Dag/Code/Code.tsx 
b/airflow/ui/src/pages/DagsList/Dag/Code/Code.tsx
index 2af4b35d3b4..badb0ca4048 100644
--- a/airflow/ui/src/pages/DagsList/Dag/Code/Code.tsx
+++ b/airflow/ui/src/pages/DagsList/Dag/Code/Code.tsx
@@ -55,13 +55,9 @@ export const Code = () => {
     data: code,
     error: codeError,
     isLoading: isCodeLoading,
-  } = useDagSourceServiceGetDagSource(
-    {
-      fileToken: dag?.file_token ?? "",
-    },
-    undefined,
-    { enabled: Boolean(dag?.file_token) },
-  );
+  } = useDagSourceServiceGetDagSource({
+    dagId: dagId ?? "",
+  });
 
   // TODO: get default_wrap from config
   const [wrap, setWrap] = useState(false);
diff --git a/airflow/www/static/js/api/useDagCode.ts 
b/airflow/www/static/js/api/useDagCode.ts
index c6856cd7433..f6a3e8b4079 100644
--- a/airflow/www/static/js/api/useDagCode.ts
+++ b/airflow/www/static/js/api/useDagCode.ts
@@ -29,17 +29,13 @@ export default function useDagCode() {
   return useQuery(
     ["dagSourceQuery"],
     () => {
-      const fileToken = dagData?.fileToken || "";
-      const dagSourceApiUrl = getMetaValue("dag_source_api").replace(
-        "_FILE_TOKEN_",
-        fileToken
-      );
+      const dagSourceApiUrl = getMetaValue("dag_source_api");
       return axios.get<AxiosResponse, string>(dagSourceApiUrl, {
         headers: { Accept: "text/plain" },
       });
     },
     {
-      enabled: !!dagData?.fileToken,
+      enabled: !!dagData?.dagId,
     }
   );
 }
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index 1d68b650acb..b3ed0790efd 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -736,17 +736,13 @@ export interface paths {
   "/dagStats": {
     get: operations["get_dag_stats"];
   };
-  "/dagSources/{file_token}": {
+  "/dagSources/{dag_id}": {
     /** Get a source code using file token. */
     get: operations["get_dag_source"];
     parameters: {
       path: {
-        /**
-         * The key containing the encrypted path to the file. Encryption and 
decryption take place only on
-         * the server. This prevents the client from reading an non-DAG file. 
This also ensures API
-         * extensibility, because the format of encrypted data may change.
-         */
-        file_token: components["parameters"]["FileToken"];
+        /** The DAG ID. */
+        dag_id: components["parameters"]["DAGID"];
       };
     };
   };
@@ -2763,6 +2759,8 @@ export interface components {
     UpdateMask: string[];
     /** @description List of field for return. */
     ReturnFields: string[];
+    /** @description The version number. */
+    VersionNumber: number;
   };
   requestBodies: {};
   headers: {};
@@ -4949,12 +4947,12 @@ export interface operations {
   get_dag_source: {
     parameters: {
       path: {
-        /**
-         * The key containing the encrypted path to the file. Encryption and 
decryption take place only on
-         * the server. This prevents the client from reading an non-DAG file. 
This also ensures API
-         * extensibility, because the format of encrypted data may change.
-         */
-        file_token: components["parameters"]["FileToken"];
+        /** The DAG ID. */
+        dag_id: components["parameters"]["DAGID"];
+      };
+      query: {
+        /** The version number. */
+        version_number?: components["parameters"]["VersionNumber"];
       };
     };
     responses: {
@@ -5748,7 +5746,8 @@ export type GetDagStatsVariables = 
CamelCasedPropertiesDeep<
   operations["get_dag_stats"]["parameters"]["query"]
 >;
 export type GetDagSourceVariables = CamelCasedPropertiesDeep<
-  operations["get_dag_source"]["parameters"]["path"]
+  operations["get_dag_source"]["parameters"]["path"] &
+    operations["get_dag_source"]["parameters"]["query"]
 >;
 export type GetDagWarningsVariables = CamelCasedPropertiesDeep<
   operations["get_dag_warnings"]["parameters"]["query"]
diff --git a/airflow/www/templates/airflow/dag.html 
b/airflow/www/templates/airflow/dag.html
index 6969993e919..eb8df2f19be 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -98,7 +98,7 @@
   <meta name="set_mapped_task_instance_note" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_set_mapped_task_instance_note',
 dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_', map_index=0 
) }}">
   <meta name="set_dag_run_note" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_dag_run_endpoint_set_dag_run_note',
 dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_') }}">
   <meta name="dag_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_dag_endpoint_get_dag', 
dag_id=dag.dag_id) }}">
-  <meta name="dag_source_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_dag_source_endpoint_get_dag_source',
 file_token='_FILE_TOKEN_') }}">
+  <meta name="dag_source_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_dag_source_endpoint_get_dag_source',
 dag_id=dag.dag_id) }}">
   <meta name="dag_details_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_dag_endpoint_get_dag_details', 
dag_id=dag.dag_id) }}">
   <meta name="assets_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_asset_endpoint_get_assets') 
}}">
   <meta name="event_logs_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_event_log_endpoint_get_event_logs')
 }}">
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index 34920196421..a824066eb3f 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-1bf2f91dc87440cdbf7d1defb10a4693b47280b9a9b914dae3f1866eb2169629
\ No newline at end of file
+028d2fec22a15bbf5794e2fc7522eaf880a8b6293ead484780ef1a14e6cd9b48
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg 
b/docs/apache-airflow/img/airflow_erd.svg
index 41fe37adaaa..04579984ec7 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -32,13 +32,13 @@
 <text text-anchor="start" x="115" y="-213.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
 <text text-anchor="start" x="120" y="-213.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(60)]</text>
 <polygon fill="none" stroke="black" points="70,-179 70,-204 351,-204 351,-179 
70,-179"/>
-<text text-anchor="start" x="75" y="-188.8" font-family="Helvetica,sans-Serif" 
font-size="14.00">execution_date</text>
-<text text-anchor="start" x="180" y="-188.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="185" y="-188.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="75" y="-188.8" font-family="Helvetica,sans-Serif" 
font-size="14.00">extra</text>
+<text text-anchor="start" x="112" y="-188.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="117" y="-188.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
 <polygon fill="none" stroke="black" points="70,-154 70,-179 351,-179 351,-154 
70,-154"/>
-<text text-anchor="start" x="75" y="-163.8" font-family="Helvetica,sans-Serif" 
font-size="14.00">extra</text>
-<text text-anchor="start" x="112" y="-163.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="117" y="-163.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<text text-anchor="start" x="75" y="-163.8" font-family="Helvetica,sans-Serif" 
font-size="14.00">logical_date</text>
+<text text-anchor="start" x="157" y="-163.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="162" y="-163.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="70,-129 70,-154 351,-154 351,-129 
70,-129"/>
 <text text-anchor="start" x="75" y="-138.8" font-family="Helvetica,sans-Serif" 
font-size="14.00">map_index</text>
 <text text-anchor="start" x="151" y="-138.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
@@ -1528,39 +1528,35 @@
 <!-- dag_version -->
 <g id="node24" class="node">
 <title>dag_version</title>
-<polygon fill="none" stroke="black" points="534.5,-1897 534.5,-1925 
815.5,-1925 815.5,-1897 534.5,-1897"/>
-<text text-anchor="start" x="620.5" y="-1908.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_version</text>
-<polygon fill="none" stroke="black" points="534.5,-1872 534.5,-1897 
815.5,-1897 815.5,-1872 534.5,-1872"/>
-<text text-anchor="start" x="539.5" y="-1881.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="552.5" y="-1881.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="557.5" y="-1881.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
-<text text-anchor="start" x="609.5" y="-1881.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="534.5,-1847 534.5,-1872 
815.5,-1872 815.5,-1847 534.5,-1847"/>
-<text text-anchor="start" x="539.5" y="-1856.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_at</text>
-<text text-anchor="start" x="612.5" y="-1856.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="617.5" y="-1856.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="713.5" y="-1856.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="534.5,-1822 534.5,-1847 
815.5,-1847 815.5,-1822 534.5,-1822"/>
-<text text-anchor="start" x="539.5" y="-1831.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
-<text text-anchor="start" x="585.5" y="-1831.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="590.5" y="-1831.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="711.5" y="-1831.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="534.5,-1797 534.5,-1822 
815.5,-1822 815.5,-1797 534.5,-1797"/>
-<text text-anchor="start" x="539.5" y="-1806.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">version_name</text>
-<text text-anchor="start" x="636.5" y="-1806.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="641.5" y="-1806.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="534.5,-1772 534.5,-1797 
815.5,-1797 815.5,-1772 534.5,-1772"/>
-<text text-anchor="start" x="539.5" y="-1781.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">version_number</text>
-<text text-anchor="start" x="652.5" y="-1781.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="657.5" y="-1781.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="734.5" y="-1781.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="534.5,-1898 534.5,-1926 
815.5,-1926 815.5,-1898 534.5,-1898"/>
+<text text-anchor="start" x="620.5" y="-1909.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_version</text>
+<polygon fill="none" stroke="black" points="534.5,-1873 534.5,-1898 
815.5,-1898 815.5,-1873 534.5,-1873"/>
+<text text-anchor="start" x="539.5" y="-1882.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="552.5" y="-1882.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="557.5" y="-1882.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
+<text text-anchor="start" x="609.5" y="-1882.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="534.5,-1848 534.5,-1873 
815.5,-1873 815.5,-1848 534.5,-1848"/>
+<text text-anchor="start" x="539.5" y="-1857.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_at</text>
+<text text-anchor="start" x="612.5" y="-1857.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="617.5" y="-1857.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="713.5" y="-1857.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="534.5,-1823 534.5,-1848 
815.5,-1848 815.5,-1823 534.5,-1823"/>
+<text text-anchor="start" x="539.5" y="-1832.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
+<text text-anchor="start" x="585.5" y="-1832.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="590.5" y="-1832.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="711.5" y="-1832.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="534.5,-1798 534.5,-1823 
815.5,-1823 815.5,-1798 534.5,-1798"/>
+<text text-anchor="start" x="539.5" y="-1807.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">version_number</text>
+<text text-anchor="start" x="652.5" y="-1807.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="657.5" y="-1807.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="734.5" y="-1807.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 </g>
 <!-- dag&#45;&#45;dag_version -->
 <g id="edge19" class="edge">
 <title>dag&#45;&#45;dag_version</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M420.27,-1989.37C444.28,-1971.62 468.82,-1954.47 493,-1939 503.47,-1932.3 
514.51,-1925.73 525.76,-1919.38"/>
-<text text-anchor="start" x="494.76" y="-1908.18" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="420.27" y="-1978.17" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M420.16,-1987.33C444.08,-1970.07 468.64,-1953.58 493,-1939 503.38,-1932.79 
514.35,-1926.82 525.55,-1921.14"/>
+<text text-anchor="start" x="494.55" y="-1909.94" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="420.16" y="-1976.13" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- dag_tag -->
 <g id="node25" class="node">
@@ -1588,30 +1584,30 @@
 <!-- dag_owner_attributes -->
 <g id="node26" class="node">
 <title>dag_owner_attributes</title>
-<polygon fill="none" stroke="black" points="545.5,-1718 545.5,-1746 
803.5,-1746 803.5,-1718 545.5,-1718"/>
-<text text-anchor="start" x="576.5" y="-1729.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_owner_attributes</text>
+<polygon fill="none" stroke="black" points="545.5,-1743 545.5,-1771 
803.5,-1771 803.5,-1743 545.5,-1743"/>
+<text text-anchor="start" x="576.5" y="-1754.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_owner_attributes</text>
+<polygon fill="none" stroke="black" points="545.5,-1718 545.5,-1743 
803.5,-1743 803.5,-1718 545.5,-1718"/>
+<text text-anchor="start" x="550.5" y="-1727.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">dag_id</text>
+<text text-anchor="start" x="596.5" y="-1727.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="601.5" y="-1727.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="722.5" y="-1727.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="545.5,-1693 545.5,-1718 
803.5,-1718 803.5,-1693 545.5,-1693"/>
-<text text-anchor="start" x="550.5" y="-1702.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">dag_id</text>
-<text text-anchor="start" x="596.5" y="-1702.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="601.5" y="-1702.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="722.5" y="-1702.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="550.5" y="-1702.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">owner</text>
+<text text-anchor="start" x="593.5" y="-1702.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="598.5" y="-1702.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
+<text text-anchor="start" x="719.5" y="-1702.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="545.5,-1668 545.5,-1693 
803.5,-1693 803.5,-1668 545.5,-1668"/>
-<text text-anchor="start" x="550.5" y="-1677.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">owner</text>
-<text text-anchor="start" x="593.5" y="-1677.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="598.5" y="-1677.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
-<text text-anchor="start" x="719.5" y="-1677.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="545.5,-1643 545.5,-1668 
803.5,-1668 803.5,-1643 545.5,-1643"/>
-<text text-anchor="start" x="550.5" y="-1652.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">link</text>
-<text text-anchor="start" x="575.5" y="-1652.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="580.5" y="-1652.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
-<text text-anchor="start" x="701.5" y="-1652.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="550.5" y="-1677.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">link</text>
+<text text-anchor="start" x="575.5" y="-1677.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="580.5" y="-1677.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
+<text text-anchor="start" x="701.5" y="-1677.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 </g>
 <!-- dag&#45;&#45;dag_owner_attributes -->
 <g id="edge21" class="edge">
 <title>dag&#45;&#45;dag_owner_attributes</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M414.33,-1859.28C450.12,-1808.44 480.08,-1768.86 493,-1759 506.34,-1748.81 
521.45,-1740.15 537.1,-1732.81"/>
-<text text-anchor="start" x="506.1" y="-1721.61" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="404.33" y="-1848.08" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M420.06,-1862.37C447.87,-1827.67 473.46,-1799.52 493,-1785 506.5,-1774.97 
521.7,-1766.37 537.4,-1759.02"/>
+<text text-anchor="start" x="506.4" y="-1747.82" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="420.06" y="-1851.17" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- dag_warning -->
 <g id="node27" class="node">
@@ -1649,9 +1645,9 @@
 <!-- dag_version&#45;&#45;task_instance -->
 <g id="edge26" class="edge">
 <title>dag_version&#45;&#45;task_instance</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M823.29,-1913.18C834.96,-1921.03 846.06,-1929.63 856,-1939 903.38,-1983.65 
895.75,-2010.02 929,-2066 1051.69,-2272.57 1194.13,-2504.42 1300.88,-2676.65"/>
-<text text-anchor="start" x="1269.88" y="-2665.45" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="823.29" y="-1901.98" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M823.24,-1914.45C835,-1921.71 846.12,-1929.86 856,-1939 907.5,-1986.66 
894.21,-2018.06 929,-2079 1048.03,-2287.47 1192.47,-2518.21 1300.94,-2687.55"/>
+<text text-anchor="start" x="1269.94" y="-2676.35" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="823.24" y="-1918.25" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
 </g>
 <!-- dag_run -->
 <g id="node28" class="node">
@@ -1752,102 +1748,102 @@
 <!-- dag_version&#45;&#45;dag_run -->
 <g id="edge23" class="edge">
 <title>dag_version&#45;&#45;dag_run</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M823.14,-1904.5C835.88,-1914.28 847.23,-1925.7 856,-1939 894.52,-1997.45 
883.54,-2498.51 892,-2568 921.36,-2809.06 973.46,-3079.93 1014.98,-3277.31"/>
-<text text-anchor="start" x="983.98" y="-3266.11" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="823.14" y="-1893.3" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M823.28,-1907.13C835.9,-1915.94 847.19,-1926.45 856,-1939 896.67,-1996.93 
883.42,-2504.74 892,-2575 921.15,-2813.65 973.02,-3081.66 1014.51,-3277.47"/>
+<text text-anchor="start" x="983.51" y="-3266.27" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="823.28" y="-1895.93" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
 </g>
 <!-- dag_code -->
 <g id="node29" class="node">
 <title>dag_code</title>
-<polygon fill="none" stroke="black" points="947.5,-2025 947.5,-2053 
1209.5,-2053 1209.5,-2025 947.5,-2025"/>
-<text text-anchor="start" x="1036" y="-2036.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_code</text>
-<polygon fill="none" stroke="black" points="947.5,-2000 947.5,-2025 
1209.5,-2025 1209.5,-2000 947.5,-2000"/>
-<text text-anchor="start" x="952.5" y="-2009.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="965.5" y="-2009.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="970.5" y="-2009.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
-<text text-anchor="start" x="1022.5" y="-2009.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="947.5,-1975 947.5,-2000 
1209.5,-2000 1209.5,-1975 947.5,-1975"/>
-<text text-anchor="start" x="952.5" y="-1984.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_at</text>
-<text text-anchor="start" x="1025.5" y="-1984.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1030.5" y="-1984.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="1126.5" y="-1984.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="947.5,-1950 947.5,-1975 
1209.5,-1975 1209.5,-1950 947.5,-1950"/>
-<text text-anchor="start" x="952.5" y="-1959.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_version_id</text>
-<text text-anchor="start" x="1056.5" y="-1959.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1061.5" y="-1959.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
-<text text-anchor="start" x="1113.5" y="-1959.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="947.5,-1925 947.5,-1950 
1209.5,-1950 1209.5,-1925 947.5,-1925"/>
-<text text-anchor="start" x="952.5" y="-1934.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc</text>
-<text text-anchor="start" x="993.5" y="-1934.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="998.5" y="-1934.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
-<text text-anchor="start" x="1128.5" y="-1934.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="947.5,-1900 947.5,-1925 
1209.5,-1925 1209.5,-1900 947.5,-1900"/>
-<text text-anchor="start" x="952.5" y="-1909.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc_hash</text>
-<text text-anchor="start" x="1034.5" y="-1909.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1039.5" y="-1909.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BIGINT]</text>
-<text text-anchor="start" x="1103.5" y="-1909.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="947.5,-1875 947.5,-1900 
1209.5,-1900 1209.5,-1875 947.5,-1875"/>
-<text text-anchor="start" x="952.5" y="-1884.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">source_code</text>
-<text text-anchor="start" x="1038.5" y="-1884.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1043.5" y="-1884.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
-<text text-anchor="start" x="1093.5" y="-1884.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="947.5,-2038 947.5,-2066 
1209.5,-2066 1209.5,-2038 947.5,-2038"/>
+<text text-anchor="start" x="1036" y="-2049.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_code</text>
+<polygon fill="none" stroke="black" points="947.5,-2013 947.5,-2038 
1209.5,-2038 1209.5,-2013 947.5,-2013"/>
+<text text-anchor="start" x="952.5" y="-2022.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="965.5" y="-2022.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="970.5" y="-2022.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
+<text text-anchor="start" x="1022.5" y="-2022.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="947.5,-1988 947.5,-2013 
1209.5,-2013 1209.5,-1988 947.5,-1988"/>
+<text text-anchor="start" x="952.5" y="-1997.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_at</text>
+<text text-anchor="start" x="1025.5" y="-1997.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1030.5" y="-1997.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="1126.5" y="-1997.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="947.5,-1963 947.5,-1988 
1209.5,-1988 1209.5,-1963 947.5,-1963"/>
+<text text-anchor="start" x="952.5" y="-1972.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_version_id</text>
+<text text-anchor="start" x="1056.5" y="-1972.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1061.5" y="-1972.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
+<text text-anchor="start" x="1113.5" y="-1972.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="947.5,-1938 947.5,-1963 
1209.5,-1963 1209.5,-1938 947.5,-1938"/>
+<text text-anchor="start" x="952.5" y="-1947.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc</text>
+<text text-anchor="start" x="993.5" y="-1947.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="998.5" y="-1947.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
+<text text-anchor="start" x="1128.5" y="-1947.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="947.5,-1913 947.5,-1938 
1209.5,-1938 1209.5,-1913 947.5,-1913"/>
+<text text-anchor="start" x="952.5" y="-1922.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc_hash</text>
+<text text-anchor="start" x="1034.5" y="-1922.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1039.5" y="-1922.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BIGINT]</text>
+<text text-anchor="start" x="1103.5" y="-1922.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="947.5,-1888 947.5,-1913 
1209.5,-1913 1209.5,-1888 947.5,-1888"/>
+<text text-anchor="start" x="952.5" y="-1897.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">source_code</text>
+<text text-anchor="start" x="1038.5" y="-1897.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1043.5" y="-1897.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<text text-anchor="start" x="1093.5" y="-1897.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 </g>
 <!-- dag_version&#45;&#45;dag_code -->
 <g id="edge24" class="edge">
 <title>dag_version&#45;&#45;dag_code</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M823.19,-1891.25C861.16,-1902.11 901.86,-1913.75 939.32,-1924.47"/>
-<text text-anchor="start" x="908.32" y="-1913.27" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="823.19" y="-1880.05" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M823.19,-1904.25C861.16,-1915.11 901.86,-1926.75 939.32,-1937.47"/>
+<text text-anchor="start" x="908.32" y="-1926.27" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="823.19" y="-1908.05" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- serialized_dag -->
 <g id="node30" class="node">
 <title>serialized_dag</title>
-<polygon fill="none" stroke="black" points="943.5,-1821 943.5,-1849 
1213.5,-1849 1213.5,-1821 943.5,-1821"/>
-<text text-anchor="start" x="1015" y="-1832.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">serialized_dag</text>
-<polygon fill="none" stroke="black" points="943.5,-1796 943.5,-1821 
1213.5,-1821 1213.5,-1796 943.5,-1796"/>
-<text text-anchor="start" x="948.5" y="-1805.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="961.5" y="-1805.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="966.5" y="-1805.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
-<text text-anchor="start" x="1018.5" y="-1805.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="943.5,-1771 943.5,-1796 
1213.5,-1796 1213.5,-1771 943.5,-1771"/>
-<text text-anchor="start" x="948.5" y="-1780.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_at</text>
-<text text-anchor="start" x="1021.5" y="-1780.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1026.5" y="-1780.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="1122.5" y="-1780.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="943.5,-1746 943.5,-1771 
1213.5,-1771 1213.5,-1746 943.5,-1746"/>
-<text text-anchor="start" x="948.5" y="-1755.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_hash</text>
-<text text-anchor="start" x="1015.5" y="-1755.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1020.5" y="-1755.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(32)]</text>
-<text text-anchor="start" x="1132.5" y="-1755.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="943.5,-1721 943.5,-1746 
1213.5,-1746 1213.5,-1721 943.5,-1721"/>
-<text text-anchor="start" x="948.5" y="-1730.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
-<text text-anchor="start" x="994.5" y="-1730.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="999.5" y="-1730.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="1120.5" y="-1730.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="943.5,-1696 943.5,-1721 
1213.5,-1721 1213.5,-1696 943.5,-1696"/>
-<text text-anchor="start" x="948.5" y="-1705.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_version_id</text>
-<text text-anchor="start" x="1052.5" y="-1705.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1057.5" y="-1705.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
-<text text-anchor="start" x="1109.5" y="-1705.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="943.5,-1671 943.5,-1696 
1213.5,-1696 1213.5,-1671 943.5,-1671"/>
-<text text-anchor="start" x="948.5" y="-1680.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data</text>
-<text text-anchor="start" x="979.5" y="-1680.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="984.5" y="-1680.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
-<polygon fill="none" stroke="black" points="943.5,-1646 943.5,-1671 
1213.5,-1671 1213.5,-1646 943.5,-1646"/>
-<text text-anchor="start" x="948.5" y="-1655.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data_compressed</text>
-<text text-anchor="start" x="1070.5" y="-1655.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1075.5" y="-1655.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
-<polygon fill="none" stroke="black" points="943.5,-1621 943.5,-1646 
1213.5,-1646 1213.5,-1621 943.5,-1621"/>
-<text text-anchor="start" x="948.5" y="-1630.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">processor_subdir</text>
-<text text-anchor="start" x="1067.5" y="-1630.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1072.5" y="-1630.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
+<polygon fill="none" stroke="black" points="943.5,-1834 943.5,-1862 
1213.5,-1862 1213.5,-1834 943.5,-1834"/>
+<text text-anchor="start" x="1015" y="-1845.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">serialized_dag</text>
+<polygon fill="none" stroke="black" points="943.5,-1809 943.5,-1834 
1213.5,-1834 1213.5,-1809 943.5,-1809"/>
+<text text-anchor="start" x="948.5" y="-1818.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="961.5" y="-1818.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="966.5" y="-1818.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
+<text text-anchor="start" x="1018.5" y="-1818.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="943.5,-1784 943.5,-1809 
1213.5,-1809 1213.5,-1784 943.5,-1784"/>
+<text text-anchor="start" x="948.5" y="-1793.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_at</text>
+<text text-anchor="start" x="1021.5" y="-1793.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1026.5" y="-1793.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="1122.5" y="-1793.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="943.5,-1759 943.5,-1784 
1213.5,-1784 1213.5,-1759 943.5,-1759"/>
+<text text-anchor="start" x="948.5" y="-1768.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_hash</text>
+<text text-anchor="start" x="1015.5" y="-1768.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1020.5" y="-1768.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(32)]</text>
+<text text-anchor="start" x="1132.5" y="-1768.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="943.5,-1734 943.5,-1759 
1213.5,-1759 1213.5,-1734 943.5,-1734"/>
+<text text-anchor="start" x="948.5" y="-1743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
+<text text-anchor="start" x="994.5" y="-1743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="999.5" y="-1743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="1120.5" y="-1743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="943.5,-1709 943.5,-1734 
1213.5,-1734 1213.5,-1709 943.5,-1709"/>
+<text text-anchor="start" x="948.5" y="-1718.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_version_id</text>
+<text text-anchor="start" x="1052.5" y="-1718.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1057.5" y="-1718.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [UUID]</text>
+<text text-anchor="start" x="1109.5" y="-1718.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="943.5,-1684 943.5,-1709 
1213.5,-1709 1213.5,-1684 943.5,-1684"/>
+<text text-anchor="start" x="948.5" y="-1693.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data</text>
+<text text-anchor="start" x="979.5" y="-1693.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="984.5" y="-1693.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
+<polygon fill="none" stroke="black" points="943.5,-1659 943.5,-1684 
1213.5,-1684 1213.5,-1659 943.5,-1659"/>
+<text text-anchor="start" x="948.5" y="-1668.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data_compressed</text>
+<text text-anchor="start" x="1070.5" y="-1668.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1075.5" y="-1668.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
+<polygon fill="none" stroke="black" points="943.5,-1634 943.5,-1659 
1213.5,-1659 1213.5,-1634 943.5,-1634"/>
+<text text-anchor="start" x="948.5" y="-1643.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">processor_subdir</text>
+<text text-anchor="start" x="1067.5" y="-1643.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1072.5" y="-1643.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
 </g>
 <!-- dag_version&#45;&#45;serialized_dag -->
 <g id="edge25" class="edge">
 <title>dag_version&#45;&#45;serialized_dag</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M823.19,-1807.12C859.81,-1796.73 898.98,-1785.63 935.33,-1775.32"/>
-<text text-anchor="start" x="904.33" y="-1764.12" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="823.19" y="-1795.92" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M823.19,-1820.12C859.81,-1809.73 898.98,-1798.63 935.33,-1788.32"/>
+<text text-anchor="start" x="904.33" y="-1777.12" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="823.19" y="-1808.92" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- dag_run&#45;&#45;dagrun_asset_event -->
 <g id="edge28" class="edge">
@@ -2067,39 +2063,39 @@
 <text text-anchor="start" x="1289.35" y="-3810.87" font-family="Times,serif" 
font-size="14.00">0..N</text>
 <text text-anchor="start" x="856.08" y="-3813.58" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
-<!-- session -->
+<!-- alembic_version -->
 <g id="node41" class="node">
-<title>session</title>
-<polygon fill="none" stroke="black" points="106,-3834 106,-3862 314,-3862 
314,-3834 106,-3834"/>
-<text text-anchor="start" x="176" y="-3845.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">session</text>
-<polygon fill="none" stroke="black" points="106,-3809 106,-3834 314,-3834 
314,-3809 106,-3809"/>
-<text text-anchor="start" x="111" y="-3818.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="124" y="-3818.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="129" y="-3818.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="206" y="-3818.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="106,-3784 106,-3809 314,-3809 
314,-3784 106,-3784"/>
-<text text-anchor="start" x="111" y="-3793.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data</text>
-<text text-anchor="start" x="142" y="-3793.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="147" y="-3793.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
-<polygon fill="none" stroke="black" points="106,-3759 106,-3784 314,-3784 
314,-3759 106,-3759"/>
-<text text-anchor="start" x="111" y="-3768.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">expiry</text>
-<text text-anchor="start" x="155" y="-3768.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="160" y="-3768.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="106,-3734 106,-3759 314,-3759 
314,-3734 106,-3734"/>
-<text text-anchor="start" x="111" y="-3743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">session_id</text>
-<text text-anchor="start" x="183" y="-3743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="188" y="-3743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(255)]</text>
+<title>alembic_version</title>
+<polygon fill="none" stroke="black" points="64,-3759 64,-3787 357,-3787 
357,-3759 64,-3759"/>
+<text text-anchor="start" x="138" y="-3770.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">alembic_version</text>
+<polygon fill="none" stroke="black" points="64,-3734 64,-3759 357,-3759 
357,-3734 64,-3734"/>
+<text text-anchor="start" x="69" y="-3743.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">version_num</text>
+<text text-anchor="start" x="159" y="-3743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="164" y="-3743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(32)]</text>
+<text text-anchor="start" x="276" y="-3743.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 </g>
-<!-- alembic_version -->
+<!-- session -->
 <g id="node42" class="node">
-<title>alembic_version</title>
-<polygon fill="none" stroke="black" points="64,-3989 64,-4017 357,-4017 
357,-3989 64,-3989"/>
-<text text-anchor="start" x="138" y="-4000.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">alembic_version</text>
-<polygon fill="none" stroke="black" points="64,-3964 64,-3989 357,-3989 
357,-3964 64,-3964"/>
-<text text-anchor="start" x="69" y="-3973.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">version_num</text>
-<text text-anchor="start" x="159" y="-3973.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="164" y="-3973.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(32)]</text>
-<text text-anchor="start" x="276" y="-3973.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<title>session</title>
+<polygon fill="none" stroke="black" points="106,-3990 106,-4018 314,-4018 
314,-3990 106,-3990"/>
+<text text-anchor="start" x="176" y="-4001.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">session</text>
+<polygon fill="none" stroke="black" points="106,-3965 106,-3990 314,-3990 
314,-3965 106,-3965"/>
+<text text-anchor="start" x="111" y="-3974.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="124" y="-3974.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="129" y="-3974.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="206" y="-3974.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="106,-3940 106,-3965 314,-3965 
314,-3940 106,-3940"/>
+<text text-anchor="start" x="111" y="-3949.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data</text>
+<text text-anchor="start" x="142" y="-3949.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="147" y="-3949.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
+<polygon fill="none" stroke="black" points="106,-3915 106,-3940 314,-3940 
314,-3915 106,-3915"/>
+<text text-anchor="start" x="111" y="-3924.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">expiry</text>
+<text text-anchor="start" x="155" y="-3924.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="160" y="-3924.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="106,-3890 106,-3915 314,-3915 
314,-3890 106,-3890"/>
+<text text-anchor="start" x="111" y="-3899.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">session_id</text>
+<text text-anchor="start" x="183" y="-3899.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="188" y="-3899.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(255)]</text>
 </g>
 <!-- ab_user -->
 <g id="node43" class="node">
diff --git 
a/providers/tests/fab/auth_manager/api_endpoints/test_dag_source_endpoint.py 
b/providers/tests/fab/auth_manager/api_endpoints/test_dag_source_endpoint.py
index bc4ba7041ca..9b6ed6df160 100644
--- a/providers/tests/fab/auth_manager/api_endpoints/test_dag_source_endpoint.py
+++ b/providers/tests/fab/auth_manager/api_endpoints/test_dag_source_endpoint.py
@@ -97,25 +97,13 @@ class TestGetSource:
         docstring = ast.get_docstring(module)
         return docstring
 
-    def test_should_respond_406(self, url_safe_serializer):
-        dagbag = DagBag(dag_folder=EXAMPLE_DAG_FILE)
-        dagbag.sync_to_db()
-        test_dag: DAG = dagbag.dags[TEST_DAG_ID]
-
-        url = 
f"/api/v1/dagSources/{url_safe_serializer.dumps(test_dag.fileloc)}"
-        response = self.client.get(
-            url, headers={"Accept": "image/webp"}, 
environ_overrides={"REMOTE_USER": "test"}
-        )
-
-        assert 406 == response.status_code
-
     def test_should_respond_403_not_readable(self, url_safe_serializer):
         dagbag = DagBag(dag_folder=EXAMPLE_DAG_FILE)
         dagbag.sync_to_db()
         dag: DAG = dagbag.dags[NOT_READABLE_DAG_ID]
 
         response = self.client.get(
-            f"/api/v1/dagSources/{url_safe_serializer.dumps(dag.fileloc)}",
+            f"/api/v1/dagSources/{dag.dag_id}",
             headers={"Accept": "text/plain"},
             environ_overrides={"REMOTE_USER": "test"},
         )
@@ -132,7 +120,7 @@ class TestGetSource:
         dag: DAG = dagbag.dags[TEST_MULTIPLE_DAGS_ID]
 
         response = self.client.get(
-            f"/api/v1/dagSources/{url_safe_serializer.dumps(dag.fileloc)}",
+            f"/api/v1/dagSources/{dag.dag_id}",
             headers={"Accept": "text/plain"},
             environ_overrides={"REMOTE_USER": "test"},
         )
diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py 
b/task_sdk/src/airflow/sdk/definitions/dag.py
index d427ddde798..62caf682b77 100644
--- a/task_sdk/src/airflow/sdk/definitions/dag.py
+++ b/task_sdk/src/airflow/sdk/definitions/dag.py
@@ -355,7 +355,6 @@ class DAG:
         **Warning**: A fail stop dag can only have tasks with the default 
trigger rule ("all_success").
         An exception will be thrown if any task in a fail stop dag has a non 
default trigger rule.
     :param dag_display_name: The display name of the DAG which appears on the 
UI.
-    :param version_name: The version name of the DAG. This is used to identify 
the version of the DAG.
     """
 
     __serialized_fields: ClassVar[frozenset[str] | None] = None
@@ -438,10 +437,6 @@ class DAG:
 
     has_on_success_callback: bool = attrs.field(init=False)
     has_on_failure_callback: bool = attrs.field(init=False)
-    version_name: str | None = attrs.field(
-        default=None,
-        validator=attrs.validators.optional(attrs.validators.instance_of(str)),
-    )
 
     def __attrs_post_init__(self):
         from airflow.utils import timezone
@@ -1069,7 +1064,6 @@ if TYPE_CHECKING:
         auto_register: bool = True,
         fail_stop: bool = False,
         dag_display_name: str | None = None,
-        version_name: str | None = None,
     ) -> Callable[[Callable], Callable[..., DAG]]:
         """
         Python dag decorator which wraps a function into an Airflow DAG.
diff --git a/tests/api_connexion/endpoints/test_dag_source_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_source_endpoint.py
index 4b8c4f3c473..9218ae9e639 100644
--- a/tests/api_connexion/endpoints/test_dag_source_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_source_endpoint.py
@@ -16,28 +16,21 @@
 # under the License.
 from __future__ import annotations
 
-import ast
-import os
-from typing import TYPE_CHECKING
-
 import pytest
+from sqlalchemy import select
 
 from airflow.models import DagBag
+from airflow.models.dagcode import DagCode
+from airflow.models.serialized_dag import SerializedDagModel
 
 from tests_common.test_utils.api_connexion_utils import assert_401, 
create_user, delete_user
-from tests_common.test_utils.db import clear_db_dag_code, clear_db_dags, 
clear_db_serialized_dags
+from tests_common.test_utils.db import clear_db_dags
 
 pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
-if TYPE_CHECKING:
-    from airflow.models.dag import DAG
 
-ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, 
os.pardir))
-EXAMPLE_DAG_FILE = os.path.join("airflow", "example_dags", 
"example_bash_operator.py")
 EXAMPLE_DAG_ID = "example_bash_operator"
 TEST_DAG_ID = "latest_only"
-NOT_READABLE_DAG_ID = "latest_only_with_trigger"
-TEST_MULTIPLE_DAGS_ID = "asset_produces_1"
 
 
 @pytest.fixture(scope="module")
@@ -56,88 +49,115 @@ def configured_app(minimal_app_for_api):
     delete_user(app, username="test_no_permissions")
 
 
[email protected]
+def test_dag():
+    dagbag = DagBag(include_examples=True)
+    dagbag.sync_to_db()
+    return dagbag.dags[TEST_DAG_ID]
+
+
 class TestGetSource:
     @pytest.fixture(autouse=True)
     def setup_attrs(self, configured_app) -> None:
         self.app = configured_app
         self.client = self.app.test_client()  # type:ignore
-        self.clear_db()
-
-    def teardown_method(self) -> None:
-        self.clear_db()
+        clear_db_dags()
 
-    @staticmethod
-    def clear_db():
+    def teardown_method(self):
         clear_db_dags()
-        clear_db_serialized_dags()
-        clear_db_dag_code()
 
     @staticmethod
-    def _get_dag_file_docstring(fileloc: str) -> str | None:
+    def _get_dag_file_code(fileloc: str) -> str | None:
         with open(fileloc) as f:
             file_contents = f.read()
-        module = ast.parse(file_contents)
-        docstring = ast.get_docstring(module)
-        return docstring
+        return file_contents
 
-    def test_should_respond_200_text(self, url_safe_serializer):
-        dagbag = DagBag(dag_folder=EXAMPLE_DAG_FILE)
-        dagbag.sync_to_db()
-        test_dag: DAG = dagbag.dags[TEST_DAG_ID]
-        dag_docstring = self._get_dag_file_docstring(test_dag.fileloc)
+    def test_should_respond_200_text(self, test_dag):
+        dag_content = self._get_dag_file_code(test_dag.fileloc)
 
-        url = 
f"/api/v1/dagSources/{url_safe_serializer.dumps(test_dag.fileloc)}"
+        url = f"/api/v1/dagSources/{TEST_DAG_ID}"
         response = self.client.get(
             url, headers={"Accept": "text/plain"}, 
environ_overrides={"REMOTE_USER": "test"}
         )
 
         assert 200 == response.status_code
-        assert dag_docstring in response.data.decode()
+        assert dag_content == response.data.decode()
         assert "text/plain" == response.headers["Content-Type"]
 
-    def test_should_respond_200_json(self, url_safe_serializer):
-        dagbag = DagBag(dag_folder=EXAMPLE_DAG_FILE)
-        dagbag.sync_to_db()
-        test_dag: DAG = dagbag.dags[TEST_DAG_ID]
-        dag_docstring = self._get_dag_file_docstring(test_dag.fileloc)
+    def test_should_respond_200_json(self, session, test_dag):
+        dag_content = self._get_dag_file_code(test_dag.fileloc)
 
-        url = 
f"/api/v1/dagSources/{url_safe_serializer.dumps(test_dag.fileloc)}"
+        url = f"/api/v1/dagSources/{TEST_DAG_ID}"
         response = self.client.get(
             url, headers={"Accept": "application/json"}, 
environ_overrides={"REMOTE_USER": "test"}
         )
 
         assert 200 == response.status_code
-        assert dag_docstring in response.json["content"]
+        assert response.json == {
+            "content": dag_content,
+            "dag_id": TEST_DAG_ID,
+            "version_number": 1,
+        }
         assert "application/json" == response.headers["Content-Type"]
 
+    @pytest.mark.parametrize("accept", ["application/json", "text/plain"])
+    def test_should_respond_200_version(self, accept, session, test_dag):
+        dag_content = self._get_dag_file_code(test_dag.fileloc)
+        # force reserialization
+        SerializedDagModel.write_dag(test_dag, processor_subdir="/tmp")
+        dagcode = (
+            session.query(DagCode)
+            .filter(DagCode.fileloc == test_dag.fileloc)
+            .order_by(DagCode.id.desc())
+            .first()
+        )
+        assert dagcode.dag_version.version_number == 2
+        # populate the latest dagcode with a value
+        dag_content2 = "new source code"
+        dagcode.source_code = dag_content2
+        session.merge(dagcode)
+        session.commit()
+
+        dagcodes = session.scalars(select(DagCode).where(DagCode.fileloc == 
test_dag.fileloc)).all()
+        assert len(dagcodes) == 2
+        url = f"/api/v1/dagSources/{TEST_DAG_ID}"
+        response = self.client.get(url, headers={"Accept": accept}, 
environ_overrides={"REMOTE_USER": "test"})
+
+        assert 200 == response.status_code
+        if accept == "text/plain":
+            assert dag_content2 == response.data.decode()
+            assert dag_content != response.data.decode()
+            assert "text/plain" == response.headers["Content-Type"]
+        else:
+            assert dag_content2 == response.json["content"]
+            assert dag_content != response.json["content"]
+            assert "application/json" == response.headers["Content-Type"]
+            assert response.json == {
+                "content": dag_content2,
+                "dag_id": TEST_DAG_ID,
+                "version_number": 2,
+            }
+
     def test_should_respond_404(self):
-        wrong_fileloc = "abcd1234"
-        url = f"/api/v1/dagSources/{wrong_fileloc}"
+        non_existing_dag_id = "abcd1234"
+        url = f"/api/v1/dagSources/{non_existing_dag_id}"
         response = self.client.get(
             url, headers={"Accept": "application/json"}, 
environ_overrides={"REMOTE_USER": "test"}
         )
 
         assert 404 == response.status_code
 
-    def test_should_raises_401_unauthenticated(self, url_safe_serializer):
-        dagbag = DagBag(dag_folder=EXAMPLE_DAG_FILE)
-        dagbag.sync_to_db()
-        first_dag: DAG = next(iter(dagbag.dags.values()))
-
+    def test_should_raises_401_unauthenticated(self):
         response = self.client.get(
-            
f"/api/v1/dagSources/{url_safe_serializer.dumps(first_dag.fileloc)}",
+            f"/api/v1/dagSources/{TEST_DAG_ID}",
             headers={"Accept": "text/plain"},
         )
 
         assert_401(response)
 
-    def test_should_raise_403_forbidden(self, url_safe_serializer):
-        dagbag = DagBag(dag_folder=EXAMPLE_DAG_FILE)
-        dagbag.sync_to_db()
-        first_dag: DAG = next(iter(dagbag.dags.values()))
-
+    def test_should_raise_403_forbidden(self):
         response = self.client.get(
-            
f"/api/v1/dagSources/{url_safe_serializer.dumps(first_dag.fileloc)}",
+            f"/api/v1/dagSources/{TEST_DAG_ID}",
             headers={"Accept": "text/plain"},
             environ_overrides={"REMOTE_USER": "test_no_permissions"},
         )
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_sources.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_sources.py
index 23a246073c3..d8f81482cfb 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_sources.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_sources.py
@@ -17,17 +17,18 @@
 
 from __future__ import annotations
 
-import ast
 import json
 import os
 
 import pytest
 from httpx import Response
+from sqlalchemy import select
 
-from airflow.models.dag import DAG
 from airflow.models.dagbag import DagBag
+from airflow.models.dagcode import DagCode
+from airflow.models.serialized_dag import SerializedDagModel
 
-from tests_common.test_utils.db import clear_db_dag_code, clear_db_dags, 
clear_db_serialized_dags
+from tests_common.test_utils.db import clear_db_dags
 
 pytestmark = pytest.mark.db_test
 
@@ -38,67 +39,99 @@ EXAMPLE_DAG_FILE = os.path.join("airflow", "example_dags", 
"example_bash_operato
 TEST_DAG_ID = "latest_only"
 
 
[email protected]
+def test_dag():
+    dagbag = DagBag(include_examples=True)
+    dagbag.sync_to_db()
+    return dagbag.dags[TEST_DAG_ID]
+
+
 class TestGetDAGSource:
     @pytest.fixture(autouse=True)
     def setup(self, url_safe_serializer) -> None:
         self.clear_db()
-        self.test_dag, self.dag_docstring = self.create_dag_source()
-        fileloc = url_safe_serializer.dumps(self.test_dag.fileloc)
-        self.dag_sources_url = f"{API_PREFIX}/{fileloc}"
 
     def teardown_method(self) -> None:
         self.clear_db()
 
     @staticmethod
-    def _get_dag_file_docstring(fileloc: str) -> str | None:
+    def _get_dag_file_code(fileloc: str) -> str | None:
         with open(fileloc) as f:
             file_contents = f.read()
-        module = ast.parse(file_contents)
-        docstring = ast.get_docstring(module)
-        return docstring
-
-    def create_dag_source(self) -> tuple[DAG, str | None]:
-        dagbag = DagBag(dag_folder=EXAMPLE_DAG_FILE)
-        dagbag.sync_to_db()
-        test_dag: DAG = dagbag.dags[TEST_DAG_ID]
-        return test_dag, self._get_dag_file_docstring(test_dag.fileloc)
+        return file_contents
 
     def clear_db(self):
         clear_db_dags()
-        clear_db_serialized_dags()
-        clear_db_dag_code()
 
-    def test_should_respond_200_text(self, test_client):
-        response: Response = test_client.get(self.dag_sources_url, 
headers={"Accept": "text/plain"})
+    def test_should_respond_200_text(self, test_client, test_dag):
+        dag_content = self._get_dag_file_code(test_dag.fileloc)
+        response: Response = test_client.get(f"{API_PREFIX}/{TEST_DAG_ID}", 
headers={"Accept": "text/plain"})
 
         assert isinstance(response, Response)
         assert 200 == response.status_code
-        assert len(self.dag_docstring) > 0
-        assert self.dag_docstring in response.content.decode()
+        assert dag_content == response.content.decode()
         with pytest.raises(json.JSONDecodeError):
             json.loads(response.content.decode())
         assert response.headers["Content-Type"].startswith("text/plain")
 
-    @pytest.mark.parametrize("headers", [{"Accept": "application/json"}, {}])
-    def test_should_respond_200_json(self, test_client, headers):
+    def test_should_respond_200_json(self, test_client, test_dag):
+        dag_content = self._get_dag_file_code(test_dag.fileloc)
         response: Response = test_client.get(
-            self.dag_sources_url,
-            headers=headers,
+            f"{API_PREFIX}/{TEST_DAG_ID}",
+            headers={"Accept": "application/json"},
         )
         assert isinstance(response, Response)
         assert 200 == response.status_code
-        assert len(self.dag_docstring) > 0
-        res_json = response.json()
-        assert isinstance(res_json, dict)
-        assert len(res_json.keys()) == 1
-        assert len(res_json["content"]) > 0
-        assert isinstance(res_json["content"], str)
-        assert self.dag_docstring in res_json["content"]
+        assert response.json() == {
+            "content": dag_content,
+            "dag_id": TEST_DAG_ID,
+            "version_number": 1,
+        }
         assert response.headers["Content-Type"].startswith("application/json")
 
+    @pytest.mark.parametrize("accept", ["application/json", "text/plain"])
+    def test_should_respond_200_version(self, test_client, accept, session, 
test_dag):
+        dag_content = self._get_dag_file_code(test_dag.fileloc)
+        # force reserialization
+        SerializedDagModel.write_dag(test_dag, processor_subdir="/tmp")
+        dagcode = (
+            session.query(DagCode)
+            .filter(DagCode.fileloc == test_dag.fileloc)
+            .order_by(DagCode.id.desc())
+            .first()
+        )
+        assert dagcode.dag_version.version_number == 2
+        # populate the latest dagcode with a value
+        dag_content2 = "new source code"
+        dagcode.source_code = dag_content2
+        session.merge(dagcode)
+        session.commit()
+
+        dagcodes = session.scalars(select(DagCode).where(DagCode.fileloc == 
test_dag.fileloc)).all()
+        assert len(dagcodes) == 2
+        url = f"{API_PREFIX}/{TEST_DAG_ID}"
+        response = test_client.get(url, headers={"Accept": accept})
+
+        assert 200 == response.status_code
+        if accept == "text/plain":
+            assert dag_content2 == response.content.decode()
+            assert dag_content != response.content.decode()
+            assert response.headers["Content-Type"].startswith("text/plain")
+        else:
+            assert dag_content2 == response.json()["content"]
+            assert dag_content != response.json()["content"]
+            assert "application/json" == response.headers["Content-Type"]
+            assert response.json() == {
+                "content": dag_content2,
+                "dag_id": TEST_DAG_ID,
+                "version_number": 2,
+            }
+
     def test_should_respond_406_unsupport_mime_type(self, test_client):
+        dagbag = DagBag(include_examples=True)
+        dagbag.sync_to_db()
         response = test_client.get(
-            self.dag_sources_url,
+            f"{API_PREFIX}/{TEST_DAG_ID}",
             headers={"Accept": "text/html"},
         )
         assert 406 == response.status_code
diff --git a/tests/models/test_dag_version.py b/tests/models/test_dag_version.py
index 42a33b4b66f..ce889b10502 100644
--- a/tests/models/test_dag_version.py
+++ b/tests/models/test_dag_version.py
@@ -42,29 +42,16 @@ class TestDagVersion:
 
         latest_version = DagVersion.get_latest_version(dag.dag_id)
         assert latest_version.version_number == 1
-        assert not latest_version.version_name
-        assert latest_version.dag_id == dag.dag_id
-
-    @pytest.mark.need_serialized_dag
-    def test_writing_dag_version_with_version_name(self, dag_maker, session):
-        version_name = "my_version"
-        with dag_maker(version_name=version_name) as dag:
-            pass
-
-        latest_version = DagVersion.get_latest_version(dag.dag_id)
-        assert latest_version.version_number == 1
-        assert latest_version.version_name == version_name
         assert latest_version.dag_id == dag.dag_id
 
     def test_writing_dag_version_with_changes(self, dag_maker, session):
         """This also tested the get_latest_version method"""
-        version_name = "my_version"
-        with dag_maker("test1", version_name=version_name) as dag:
+        with dag_maker("test1") as dag:
             EmptyOperator(task_id="task1")
         dag.sync_to_db()
         SerializedDagModel.write_dag(dag)
         # Add extra task to change the dag
-        with dag_maker("test1", version_name=version_name) as dag2:
+        with dag_maker("test1") as dag2:
             EmptyOperator(task_id="task1")
             EmptyOperator(task_id="task2")
         dag2.sync_to_db()
@@ -72,18 +59,16 @@ class TestDagVersion:
 
         latest_version = DagVersion.get_latest_version(dag.dag_id)
         assert latest_version.version_number == 2
-        assert latest_version.version_name == version_name
         assert 2 == 
session.scalar(select(func.count()).where(DagVersion.dag_id == dag.dag_id))
 
     @pytest.mark.need_serialized_dag
     def test_get_version(self, dag_maker, session):
         """The two dags have the same version name and number but different 
dag ids"""
-        version_name = "my_version"
         dag1_id = "test1"
-        with dag_maker(dag1_id, version_name=version_name):
+        with dag_maker(dag1_id):
             EmptyOperator(task_id="task1")
 
-        with dag_maker("test2", version_name=version_name):
+        with dag_maker("test2"):
             EmptyOperator(task_id="task1")
 
         with dag_maker("test3"):
@@ -91,23 +76,13 @@ class TestDagVersion:
 
         version = DagVersion.get_version(dag1_id)
         assert version.version_number == 1
-        assert version.version_name == version_name
         assert version.dag_id == dag1_id
-        assert version.version == "my_version-1"
+        assert version.version == f"{dag1_id}-1"
 
     @pytest.mark.need_serialized_dag
     def test_version_property(self, dag_maker):
-        version_name = "my_version"
-        with dag_maker("test1", version_name=version_name) as dag:
-            pass
-
-        latest_version = DagVersion.get_latest_version(dag.dag_id)
-        assert latest_version.version == f"{version_name}-1"
-
-    @pytest.mark.need_serialized_dag
-    def test_version_property_with_null_version_name(self, dag_maker):
         with dag_maker("test1") as dag:
             pass
 
         latest_version = DagVersion.get_latest_version(dag.dag_id)
-        assert latest_version.version == "1"
+        assert latest_version.version == f"{dag.dag_id}-1"
diff --git a/tests/models/test_serialized_dag.py 
b/tests/models/test_serialized_dag.py
index 599cb1396d7..698adb19879 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -186,7 +186,7 @@ class TestSerializedDagModel:
         ]
         DAG.bulk_write_to_db(dags)
         # we also write to dag_version and dag_code tables
-        # in dag_version, we search for unique version_name too
+        # in dag_version.
         with assert_queries_count(24):
             SDM.bulk_sync_to_db(dags)
 
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index c29cd1ea657..350073c120e 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -877,7 +877,6 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
                     dagv = DagVersion.write_dag(
                         dag_id=dag.dag_id,
                         session=self.session,
-                        version_name=dag.version_name,
                     )
                     self.session.add(dagv)
                     self.session.flush()


Reply via email to