This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f14f66994a AIP-84: Migrating GET Dataset events for DAG runs api to 
fastAPI (#43874)
0f14f66994a is described below

commit 0f14f66994ae6960796029bf34ed871afada4dab
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Nov 15 21:47:58 2024 +0530

    AIP-84: Migrating GET Dataset events for DAG runs api to fastAPI (#43874)
    
    * AIP-84: Migrating GET Assets to fastAPI
    
    * matching response to legacy
    
    * Adding unit tests - part 1
    
    * Update airflow/api_fastapi/common/parameters.py
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * fixing the dag_ids filter
    
    * fixing the dag_ids filter
    
    * Adding unit tests - part 2
    
    * fixing unit tests & updating parameter type
    
    * review comments pierre
    
    * fixing last commit
    
    * fixing unit tests
    
    * AIP-84: Migrating GET Dataset events for DAG runs to fastAPI
    
    * adding test cases
    
    * adding test cases
    
    * review comments pierre
    
    * fixing unit tests
    
    * review comments pierre
    
    * review comments and fixing a test
    
    * review comments on ut
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
---
 .../api_connexion/endpoints/dag_run_endpoint.py    |  1 +
 airflow/api_fastapi/core_api/datamodels/assets.py  |  2 +-
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 58 ++++++++++++++++-
 .../api_fastapi/core_api/routes/public/dag_run.py  | 33 ++++++++++
 airflow/ui/openapi-gen/queries/common.ts           | 22 +++++++
 airflow/ui/openapi-gen/queries/prefetch.ts         | 26 ++++++++
 airflow/ui/openapi-gen/queries/queries.ts          | 33 ++++++++++
 airflow/ui/openapi-gen/queries/suspense.ts         | 33 ++++++++++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 11 +++-
 airflow/ui/openapi-gen/requests/services.gen.ts    | 30 +++++++++
 airflow/ui/openapi-gen/requests/types.gen.ts       | 36 ++++++++++-
 .../core_api/routes/public/test_dag_run.py         | 75 ++++++++++++++++++++++
 12 files changed, 354 insertions(+), 6 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 7cb6d76ff2a..b8e7f36d1fd 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -115,6 +115,7 @@ def get_dag_run(
         raise BadRequest("DAGRunSchema error", detail=str(e))
 
 
+@mark_fastapi_migration_done
 @security.requires_access_dag("GET", DagAccessEntity.RUN)
 @security.requires_access_asset("GET")
 @provide_session
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow/api_fastapi/core_api/datamodels/assets.py
index 1295b33fbf7..3e317a4c7e3 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -73,7 +73,7 @@ class DagRunAssetReference(BaseModel):
     dag_id: str
     execution_date: datetime = Field(alias="logical_date")
     start_date: datetime
-    end_date: datetime
+    end_date: datetime | None
     state: str
     data_interval_start: datetime
     data_interval_end: datetime
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 23d4eeb6e40..94bb60b8efb 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1164,6 +1164,58 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents:
+    get:
+      tags:
+      - DagRun
+      summary: Get Upstream Asset Events
+      description: If dag run is asset-triggered, return the asset events that 
triggered
+        it.
+      operationId: get_upstream_asset_events
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/AssetEventCollectionResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
     post:
       tags:
@@ -4919,8 +4971,10 @@ components:
           format: date-time
           title: Start Date
         end_date:
-          type: string
-          format: date-time
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
           title: End Date
         state:
           type: string
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index d95cf76f69a..decc7ff2b28 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -30,6 +30,7 @@ from airflow.api.common.mark_tasks import (
 )
 from airflow.api_fastapi.common.db.common import get_session
 from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.assets import 
AssetEventCollectionResponse, AssetEventResponse
 from airflow.api_fastapi.core_api.datamodels.dag_run import (
     DAGRunClearBody,
     DAGRunPatchBody,
@@ -149,6 +150,38 @@ def patch_dag_run(
     return DAGRunResponse.model_validate(dag_run, from_attributes=True)
 
 
+@dag_run_router.get(
+    "/{dag_run_id}/upstreamAssetEvents",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+)
+def get_upstream_asset_events(
+    dag_id: str, dag_run_id: str, session: Annotated[Session, 
Depends(get_session)]
+) -> AssetEventCollectionResponse:
+    """If dag run is asset-triggered, return the asset events that triggered 
it."""
+    dag_run: DagRun | None = session.scalar(
+        select(DagRun).where(
+            DagRun.dag_id == dag_id,
+            DagRun.run_id == dag_run_id,
+        )
+    )
+    if dag_run is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` 
was not found",
+        )
+    events = dag_run.consumed_asset_events
+    return AssetEventCollectionResponse(
+        asset_events=[
+            AssetEventResponse.model_validate(asset_event, 
from_attributes=True) for asset_event in events
+        ],
+        total_entries=len(events),
+    )
+
+
 @dag_run_router.post(
     "/{dag_run_id}/clear", 
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND])
 )
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 60464a7580e..6175179d11c 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -305,6 +305,28 @@ export const UseDagRunServiceGetDagRunKeyFn = (
   },
   queryKey?: Array<unknown>,
 ) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
+export type DagRunServiceGetUpstreamAssetEventsDefaultResponse = Awaited<
+  ReturnType<typeof DagRunService.getUpstreamAssetEvents>
+>;
+export type DagRunServiceGetUpstreamAssetEventsQueryResult<
+  TData = DagRunServiceGetUpstreamAssetEventsDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagRunServiceGetUpstreamAssetEventsKey =
+  "DagRunServiceGetUpstreamAssetEvents";
+export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = (
+  {
+    dagId,
+    dagRunId,
+  }: {
+    dagId: string;
+    dagRunId: string;
+  },
+  queryKey?: Array<unknown>,
+) => [
+  useDagRunServiceGetUpstreamAssetEventsKey,
+  ...(queryKey ?? [{ dagId, dagRunId }]),
+];
 export type DagSourceServiceGetDagSourceDefaultResponse = Awaited<
   ReturnType<typeof DagSourceService.getDagSource>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index ac1cd93db37..4c541670258 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -386,6 +386,32 @@ export const prefetchUseDagRunServiceGetDagRun = (
     queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
     queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
   });
+/**
+ * Get Upstream Asset Events
+ * If dag run is asset-triggered, return the asset events that triggered it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (
+  queryClient: QueryClient,
+  {
+    dagId,
+    dagRunId,
+  }: {
+    dagId: string;
+    dagRunId: string;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({
+      dagId,
+      dagRunId,
+    }),
+    queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }),
+  });
 /**
  * Get Dag Source
  * Get source code using file token.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 68a31b0a7a1..3c6a426ee07 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -485,6 +485,39 @@ export const useDagRunServiceGetDagRun = <
     queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
     ...options,
   });
+/**
+ * Get Upstream Asset Events
+ * If dag run is asset-triggered, return the asset events that triggered it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetUpstreamAssetEvents = <
+  TData = Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    dagRunId,
+  }: {
+    dagId: string;
+    dagRunId: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn(
+      { dagId, dagRunId },
+      queryKey,
+    ),
+    queryFn: () =>
+      DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData,
+    ...options,
+  });
 /**
  * Get Dag Source
  * Get source code using file token.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 0c162cc42cd..43331b187fe 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -469,6 +469,39 @@ export const useDagRunServiceGetDagRunSuspense = <
     queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
     ...options,
   });
+/**
+ * Get Upstream Asset Events
+ * If dag run is asset-triggered, return the asset events that triggered it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetUpstreamAssetEventsSuspense = <
+  TData = Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    dagRunId,
+  }: {
+    dagId: string;
+    dagRunId: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn(
+      { dagId, dagRunId },
+      queryKey,
+    ),
+    queryFn: () =>
+      DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData,
+    ...options,
+  });
 /**
  * Get Dag Source
  * Get source code using file token.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 61007167cdd..17b4cf78721 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1936,8 +1936,15 @@ export const $DagRunAssetReference = {
       title: "Start Date",
     },
     end_date: {
-      type: "string",
-      format: "date-time",
+      anyOf: [
+        {
+          type: "string",
+          format: "date-time",
+        },
+        {
+          type: "null",
+        },
+      ],
       title: "End Date",
     },
     state: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 451eac3365d..1ebbac6aa6c 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -45,6 +45,8 @@ import type {
   DeleteDagRunResponse,
   PatchDagRunData,
   PatchDagRunResponse,
+  GetUpstreamAssetEventsData,
+  GetUpstreamAssetEventsResponse,
   ClearDagRunData,
   ClearDagRunResponse,
   GetDagSourceData,
@@ -740,6 +742,34 @@ export class DagRunService {
     });
   }
 
+  /**
+   * Get Upstream Asset Events
+   * If dag run is asset-triggered, return the asset events that triggered it.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.dagRunId
+   * @returns AssetEventCollectionResponse Successful Response
+   * @throws ApiError
+   */
+  public static getUpstreamAssetEvents(
+    data: GetUpstreamAssetEventsData,
+  ): CancelablePromise<GetUpstreamAssetEventsResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents",
+      path: {
+        dag_id: data.dagId,
+        dag_run_id: data.dagRunId,
+      },
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
+
   /**
    * Clear Dag Run
    * @param data The data for the request.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 0b221ab4ae7..05b6e84adfc 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -426,7 +426,7 @@ export type DagRunAssetReference = {
   dag_id: string;
   logical_date: string;
   start_date: string;
-  end_date: string;
+  end_date: string | null;
   state: string;
   data_interval_start: string;
   data_interval_end: string;
@@ -1129,6 +1129,13 @@ export type PatchDagRunData = {
 
 export type PatchDagRunResponse = DAGRunResponse;
 
+export type GetUpstreamAssetEventsData = {
+  dagId: string;
+  dagRunId: string;
+};
+
+export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse;
+
 export type ClearDagRunData = {
   dagId: string;
   dagRunId: string;
@@ -2010,6 +2017,33 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents": {
+    get: {
+      req: GetUpstreamAssetEventsData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: AssetEventCollectionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear": {
     post: {
       req: ClearDagRunData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index f6ee3d2dee9..7d28a9237e3 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -22,7 +22,9 @@ from datetime import datetime, timezone
 import pytest
 from sqlalchemy import select
 
+from airflow import Asset
 from airflow.models import DagRun
+from airflow.models.asset import AssetEvent, AssetModel
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState, State
@@ -51,6 +53,7 @@ DAG1_RUN2_TRIGGERED_BY = DagRunTriggeredByType.ASSET
 DAG2_RUN1_TRIGGERED_BY = DagRunTriggeredByType.CLI
 DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API
 START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc)
+END_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc)
 EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc)
 DAG1_RUN1_NOTE = "test_note"
 
@@ -264,6 +267,78 @@ class TestDeleteDagRun:
         assert body["detail"] == "The DagRun with dag_id: `test_dag1` and 
run_id: `invalid` was not found"
 
 
+class TestGetDagRunAssetTriggerEvents:
+    def test_should_respond_200(self, test_client, dag_maker, session):
+        asset1 = Asset(uri="ds1")
+
+        with dag_maker(dag_id="source_dag", start_date=START_DATE, 
session=session):
+            EmptyOperator(task_id="task", outlets=[asset1])
+        dr = dag_maker.create_dagrun()
+        ti = dr.task_instances[0]
+
+        asset1_id = 
session.query(AssetModel.id).filter_by(uri=asset1.uri).scalar()
+        event = AssetEvent(
+            asset_id=asset1_id,
+            source_task_id=ti.task_id,
+            source_dag_id=ti.dag_id,
+            source_run_id=ti.run_id,
+            source_map_index=ti.map_index,
+        )
+        session.add(event)
+
+        with dag_maker(dag_id="TEST_DAG_ID", start_date=START_DATE, 
session=session):
+            pass
+        dr = dag_maker.create_dagrun(run_id="TEST_DAG_RUN_ID", 
run_type=DagRunType.ASSET_TRIGGERED)
+        dr.consumed_asset_events.append(event)
+
+        session.commit()
+        assert event.timestamp
+
+        response = test_client.get(
+            
"/public/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents",
+        )
+        assert response.status_code == 200
+        expected_response = {
+            "asset_events": [
+                {
+                    "timestamp": event.timestamp.isoformat().replace("+00:00", 
"Z"),
+                    "asset_id": asset1_id,
+                    "uri": asset1.uri,
+                    "extra": {},
+                    "id": event.id,
+                    "source_dag_id": ti.dag_id,
+                    "source_map_index": ti.map_index,
+                    "source_run_id": ti.run_id,
+                    "source_task_id": ti.task_id,
+                    "created_dagruns": [
+                        {
+                            "dag_id": "TEST_DAG_ID",
+                            "run_id": "TEST_DAG_RUN_ID",
+                            "data_interval_end": 
dr.data_interval_end.isoformat().replace("+00:00", "Z"),
+                            "data_interval_start": 
dr.data_interval_start.isoformat().replace("+00:00", "Z"),
+                            "end_date": None,
+                            "logical_date": 
dr.logical_date.isoformat().replace("+00:00", "Z"),
+                            "start_date": 
dr.start_date.isoformat().replace("+00:00", "Z"),
+                            "state": "running",
+                        }
+                    ],
+                }
+            ],
+            "total_entries": 1,
+        }
+        assert response.json() == expected_response
+
+    def test_should_respond_404(self, test_client):
+        response = test_client.get(
+            
"public/dags/invalid-id/dagRuns/invalid-run-id/upstreamAssetEvents",
+        )
+        assert response.status_code == 404
+        assert (
+            "The DagRun with dag_id: `invalid-id` and run_id: `invalid-run-id` 
was not found"
+            == response.json()["detail"]
+        )
+
+
 class TestClearDagRun:
     def test_clear_dag_run(self, test_client):
         response = test_client.post(

Reply via email to