This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a364acc1b8 AIP 84: Migrate GET ASSET EVENTS legacy API to fast API  
(#43881)
9a364acc1b8 is described below

commit 9a364acc1b860a0551eb2aa31b09b09d3a739fd8
Author: vatsrahul1001 <[email protected]>
AuthorDate: Thu Nov 14 15:29:53 2024 +0530

    AIP 84: Migrate GET ASSET EVENTS legacy API to fast API  (#43881)
    
    * AIP-84: Migrating GET Assets to fastAPI
    
    * matching response to legacy
    
    * Adding unit tests - part 1
    
    * Update airflow/api_fastapi/common/parameters.py
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * fixing the dag_ids filter
    
    * fixing the dag_ids filter
    
    * Adding unit tests - part 2
    
    * fixing unit tests & updating parameter type
    
    * review comments pierre
    
    * fixing last commit
    
    * fixing unit tests
    
    * migrating get assets events endpoint to fastapi
    
    * fixing test response
    
    * Adding tests for filtering
    
    * address review comments
    
    * fixing test parametrize
    
    * address review comments
    
    * address review comments
    
    * removing http 401 and 403 as its now added in  root router in #43932
    
    ---------
    
    Co-authored-by: Amogh <[email protected]>
    Co-authored-by: Jed Cunningham 
<[email protected]>
---
 airflow/api_connexion/endpoints/asset_endpoint.py  |   1 +
 airflow/api_fastapi/common/parameters.py           |  97 +++++++++-
 airflow/api_fastapi/core_api/datamodels/assets.py  |  37 +++-
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 212 +++++++++++++++++++++
 .../api_fastapi/core_api/routes/public/assets.py   |  64 ++++++-
 airflow/ui/openapi-gen/queries/common.ts           |  44 +++++
 airflow/ui/openapi-gen/queries/prefetch.ts         |  60 ++++++
 airflow/ui/openapi-gen/queries/queries.ts          |  69 +++++++
 airflow/ui/openapi-gen/queries/suspense.ts         |  69 +++++++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 163 ++++++++++++++++
 airflow/ui/openapi-gen/requests/services.gen.ts    |  42 ++++
 airflow/ui/openapi-gen/requests/types.gen.ts       |  80 ++++++++
 .../core_api/routes/public/test_assets.py          | 192 ++++++++++++++++++-
 13 files changed, 1124 insertions(+), 6 deletions(-)

diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py 
b/airflow/api_connexion/endpoints/asset_endpoint.py
index 0c45ddd7095..83722520959 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -115,6 +115,7 @@ def get_assets(
     return asset_collection_schema.dump(AssetCollection(assets=assets, 
total_entries=total_entries))
 
 
+@mark_fastapi_migration_done
 @security.requires_access_asset("GET")
 @provide_session
 @format_parameters({"limit": check_limit})
diff --git a/airflow/api_fastapi/common/parameters.py 
b/airflow/api_fastapi/common/parameters.py
index c1d7624b37a..abf6378ac5b 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -29,7 +29,7 @@ from sqlalchemy.inspection import inspect
 
 from airflow.api_connexion.endpoints.task_instance_endpoint import 
_convert_ti_states
 from airflow.models import Base, Connection
-from airflow.models.asset import AssetModel, DagScheduleAssetReference, 
TaskOutletAssetReference
+from airflow.models.asset import AssetEvent, AssetModel, 
DagScheduleAssetReference, TaskOutletAssetReference
 from airflow.models.dag import DagModel, DagTag
 from airflow.models.dagrun import DagRun
 from airflow.models.dagwarning import DagWarning, DagWarningType
@@ -440,6 +440,86 @@ class _DagIdAssetReferenceFilter(BaseParam[list[str]]):
         )
 
 
+class _AssetIdFilter(BaseParam[int]):
+    """Filter on asset_id."""
+
+    def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> 
None:
+        super().__init__(skip_none=skip_none)
+        self.attribute = attribute
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+        return select.where(self.attribute == self.value)
+
+    def depends(self, asset_id: int | None = None) -> _AssetIdFilter:
+        return self.set_value(asset_id)
+
+
+class _SourceDagIdFilter(BaseParam[str]):
+    """Filter on source_dag_id."""
+
+    def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> 
None:
+        super().__init__(skip_none=skip_none)
+        self.attribute = attribute
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+        return select.where(self.attribute == self.value)
+
+    def depends(self, source_dag_id: str | None = None) -> _SourceDagIdFilter:
+        return self.set_value(source_dag_id)
+
+
+class _SourceTaskIdFilter(BaseParam[str]):
+    """Filter on source_task_id."""
+
+    def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> 
None:
+        super().__init__(skip_none=skip_none)
+        self.attribute = attribute
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+        return select.where(self.attribute == self.value)
+
+    def depends(self, source_task_id: str | None = None) -> 
_SourceTaskIdFilter:
+        return self.set_value(source_task_id)
+
+
+class _SourceRunIdFilter(BaseParam[str]):
+    """filter on source_run_id."""
+
+    def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> 
None:
+        super().__init__(skip_none=skip_none)
+        self.attribute = attribute
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+        return select.where(self.attribute == self.value)
+
+    def depends(self, source_run_id: str | None = None) -> _SourceRunIdFilter:
+        return self.set_value(source_run_id)
+
+
+class _SourceMapIndexFilter(BaseParam[int]):
+    """Filter on source_map_index."""
+
+    def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> 
None:
+        super().__init__(skip_none=skip_none)
+        self.attribute = attribute
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+        return select.where(self.attribute == self.value)
+
+    def depends(self, source_map_index: int | None = None) -> 
_SourceMapIndexFilter:
+        return self.set_value(source_map_index)
+
+
 class Range(BaseModel, Generic[T]):
     """Range with a lower and upper bound."""
 
@@ -537,3 +617,18 @@ QueryUriPatternSearch = Annotated[_UriPatternSearch, 
Depends(_UriPatternSearch()
 QueryAssetDagIdPatternSearch = Annotated[
     _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
 ]
+QueryAssetIdFilter = Annotated[_AssetIdFilter, 
Depends(_AssetIdFilter(AssetEvent.asset_id).depends)]
+
+
+QuerySourceDagIdFilter = Annotated[
+    _SourceDagIdFilter, 
Depends(_SourceDagIdFilter(AssetEvent.source_dag_id).depends)
+]
+QuerySourceTaskIdFilter = Annotated[
+    _SourceTaskIdFilter, 
Depends(_SourceTaskIdFilter(AssetEvent.source_task_id).depends)
+]
+QuerySourceRunIdFilter = Annotated[
+    _SourceRunIdFilter, 
Depends(_SourceRunIdFilter(AssetEvent.source_run_id).depends)
+]
+QuerySourceMapIndexFilter = Annotated[
+    _SourceMapIndexFilter, 
Depends(_SourceMapIndexFilter(AssetEvent.source_map_index).depends)
+]
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow/api_fastapi/core_api/datamodels/assets.py
index 498dc4edae6..85e41ff7b56 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
 
 
 class DagScheduleAssetReference(BaseModel):
@@ -64,3 +64,38 @@ class AssetCollectionResponse(BaseModel):
 
     assets: list[AssetResponse]
     total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+    """DAGRun serializer for asset responses."""
+
+    run_id: str
+    dag_id: str
+    execution_date: datetime = Field(alias="logical_date")
+    start_date: datetime
+    end_date: datetime
+    state: str
+    data_interval_start: datetime
+    data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+    """Asset event serializer for responses."""
+
+    id: int
+    asset_id: int
+    uri: str
+    extra: dict | None = None
+    source_task_id: str | None = None
+    source_dag_id: str | None = None
+    source_run_id: str | None = None
+    source_map_index: int
+    created_dagruns: list[DagRunAssetReference]
+    timestamp: datetime
+
+
+class AssetEventCollectionResponse(BaseModel):
+    """Asset event collection response."""
+
+    asset_events: list[AssetEventResponse]
+    total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 6efa1e44585..b99b389de51 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -246,6 +246,106 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/assets/events:
+    get:
+      tags:
+      - Asset
+      summary: Get Asset Events
+      description: Get asset events.
+      operationId: get_asset_events
+      parameters:
+      - name: limit
+        in: query
+        required: false
+        schema:
+          type: integer
+          default: 100
+          title: Limit
+      - name: offset
+        in: query
+        required: false
+        schema:
+          type: integer
+          default: 0
+          title: Offset
+      - name: order_by
+        in: query
+        required: false
+        schema:
+          type: string
+          default: timestamp
+          title: Order By
+      - name: asset_id
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: integer
+          - type: 'null'
+          title: Asset Id
+      - name: source_dag_id
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Source Dag Id
+      - name: source_task_id
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Source Task Id
+      - name: source_run_id
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Source Run Id
+      - name: source_map_index
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: integer
+          - type: 'null'
+          title: Source Map Index
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/AssetEventCollectionResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/assets/{uri}:
     get:
       tags:
@@ -3567,6 +3667,75 @@ components:
       - total_entries
       title: AssetCollectionResponse
       description: Asset collection response.
+    AssetEventCollectionResponse:
+      properties:
+        asset_events:
+          items:
+            $ref: '#/components/schemas/AssetEventResponse'
+          type: array
+          title: Asset Events
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - asset_events
+      - total_entries
+      title: AssetEventCollectionResponse
+      description: Asset event collection response.
+    AssetEventResponse:
+      properties:
+        id:
+          type: integer
+          title: Id
+        asset_id:
+          type: integer
+          title: Asset Id
+        uri:
+          type: string
+          title: Uri
+        extra:
+          anyOf:
+          - type: object
+          - type: 'null'
+          title: Extra
+        source_task_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Source Task Id
+        source_dag_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Source Dag Id
+        source_run_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Source Run Id
+        source_map_index:
+          type: integer
+          title: Source Map Index
+        created_dagruns:
+          items:
+            $ref: '#/components/schemas/DagRunAssetReference'
+          type: array
+          title: Created Dagruns
+        timestamp:
+          type: string
+          format: date-time
+          title: Timestamp
+      type: object
+      required:
+      - id
+      - asset_id
+      - uri
+      - source_map_index
+      - created_dagruns
+      - timestamp
+      title: AssetEventResponse
+      description: Asset event serializer for responses.
     AssetResponse:
       properties:
         id:
@@ -4631,6 +4800,49 @@ components:
       - latest_dag_processor_heartbeat
       title: DagProcessorInfoSchema
       description: Schema for DagProcessor info.
+    DagRunAssetReference:
+      properties:
+        run_id:
+          type: string
+          title: Run Id
+        dag_id:
+          type: string
+          title: Dag Id
+        logical_date:
+          type: string
+          format: date-time
+          title: Logical Date
+        start_date:
+          type: string
+          format: date-time
+          title: Start Date
+        end_date:
+          type: string
+          format: date-time
+          title: End Date
+        state:
+          type: string
+          title: State
+        data_interval_start:
+          type: string
+          format: date-time
+          title: Data Interval Start
+        data_interval_end:
+          type: string
+          format: date-time
+          title: Data Interval End
+      type: object
+      required:
+      - run_id
+      - dag_id
+      - logical_date
+      - start_date
+      - end_date
+      - state
+      - data_interval_start
+      - data_interval_end
+      title: DagRunAssetReference
+      description: DAGRun serializer for asset responses.
     DagRunState:
       type: string
       enum:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow/api_fastapi/core_api/routes/public/assets.py
index d093eef3720..67218c47161 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -26,15 +26,25 @@ from sqlalchemy.orm import Session, joinedload, subqueryload
 from airflow.api_fastapi.common.db.common import get_session, paginated_select
 from airflow.api_fastapi.common.parameters import (
     QueryAssetDagIdPatternSearch,
+    QueryAssetIdFilter,
     QueryLimit,
     QueryOffset,
+    QuerySourceDagIdFilter,
+    QuerySourceMapIndexFilter,
+    QuerySourceRunIdFilter,
+    QuerySourceTaskIdFilter,
     QueryUriPatternSearch,
     SortParam,
 )
 from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.core_api.datamodels.assets import 
AssetCollectionResponse, AssetResponse
+from airflow.api_fastapi.core_api.datamodels.assets import (
+    AssetCollectionResponse,
+    AssetEventCollectionResponse,
+    AssetEventResponse,
+    AssetResponse,
+)
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
-from airflow.models.asset import AssetModel
+from airflow.models.asset import AssetEvent, AssetModel
 
 assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")
 
@@ -74,6 +84,56 @@ def get_assets(
     )
 
 
+@assets_router.get(
+    "/events",
+    responses=create_openapi_http_exception_doc([404]),
+)
+def get_asset_events(
+    limit: QueryLimit,
+    offset: QueryOffset,
+    order_by: Annotated[
+        SortParam,
+        Depends(
+            SortParam(
+                [
+                    "source_task_id",
+                    "source_dag_id",
+                    "source_run_id",
+                    "source_map_index",
+                    "timestamp",
+                ],
+                AssetEvent,
+            ).dynamic_depends("timestamp")
+        ),
+    ],
+    asset_id: QueryAssetIdFilter,
+    source_dag_id: QuerySourceDagIdFilter,
+    source_task_id: QuerySourceTaskIdFilter,
+    source_run_id: QuerySourceRunIdFilter,
+    source_map_index: QuerySourceMapIndexFilter,
+    session: Annotated[Session, Depends(get_session)],
+) -> AssetEventCollectionResponse:
+    """Get asset events."""
+    assets_event_select, total_entries = paginated_select(
+        select(AssetEvent),
+        filters=[asset_id, source_dag_id, source_task_id, source_run_id, 
source_map_index],
+        order_by=order_by,
+        offset=offset,
+        limit=limit,
+        session=session,
+    )
+
+    assets_event_select = 
assets_event_select.options(subqueryload(AssetEvent.created_dagruns))
+    assets_events = session.scalars(assets_event_select).all()
+
+    return AssetEventCollectionResponse(
+        asset_events=[
+            AssetEventResponse.model_validate(asset, from_attributes=True) for 
asset in assets_events
+        ],
+        total_entries=total_entries,
+    )
+
+
 @assets_router.get(
     "/{uri:path}",
     responses=create_openapi_http_exception_doc([401, 403, 404]),
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 1fff143182f..bc3cabf3792 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -69,6 +69,50 @@ export const UseAssetServiceGetAssetsKeyFn = (
   useAssetServiceGetAssetsKey,
   ...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]),
 ];
+export type AssetServiceGetAssetEventsDefaultResponse = Awaited<
+  ReturnType<typeof AssetService.getAssetEvents>
+>;
+export type AssetServiceGetAssetEventsQueryResult<
+  TData = AssetServiceGetAssetEventsDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useAssetServiceGetAssetEventsKey = "AssetServiceGetAssetEvents";
+export const UseAssetServiceGetAssetEventsKeyFn = (
+  {
+    assetId,
+    limit,
+    offset,
+    orderBy,
+    sourceDagId,
+    sourceMapIndex,
+    sourceRunId,
+    sourceTaskId,
+  }: {
+    assetId?: number;
+    limit?: number;
+    offset?: number;
+    orderBy?: string;
+    sourceDagId?: string;
+    sourceMapIndex?: number;
+    sourceRunId?: string;
+    sourceTaskId?: string;
+  } = {},
+  queryKey?: Array<unknown>,
+) => [
+  useAssetServiceGetAssetEventsKey,
+  ...(queryKey ?? [
+    {
+      assetId,
+      limit,
+      offset,
+      orderBy,
+      sourceDagId,
+      sourceMapIndex,
+      sourceRunId,
+      sourceTaskId,
+    },
+  ]),
+];
 export type AssetServiceGetAssetDefaultResponse = Awaited<
   ReturnType<typeof AssetService.getAsset>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index b822e35dd33..f5f120e5555 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -85,6 +85,66 @@ export const prefetchUseAssetServiceGetAssets = (
     queryFn: () =>
       AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }),
   });
+/**
+ * Get Asset Events
+ * Get asset events.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @param data.assetId
+ * @param data.sourceDagId
+ * @param data.sourceTaskId
+ * @param data.sourceRunId
+ * @param data.sourceMapIndex
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseAssetServiceGetAssetEvents = (
+  queryClient: QueryClient,
+  {
+    assetId,
+    limit,
+    offset,
+    orderBy,
+    sourceDagId,
+    sourceMapIndex,
+    sourceRunId,
+    sourceTaskId,
+  }: {
+    assetId?: number;
+    limit?: number;
+    offset?: number;
+    orderBy?: string;
+    sourceDagId?: string;
+    sourceMapIndex?: number;
+    sourceRunId?: string;
+    sourceTaskId?: string;
+  } = {},
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseAssetServiceGetAssetEventsKeyFn({
+      assetId,
+      limit,
+      offset,
+      orderBy,
+      sourceDagId,
+      sourceMapIndex,
+      sourceRunId,
+      sourceTaskId,
+    }),
+    queryFn: () =>
+      AssetService.getAssetEvents({
+        assetId,
+        limit,
+        offset,
+        orderBy,
+        sourceDagId,
+        sourceMapIndex,
+        sourceRunId,
+        sourceTaskId,
+      }),
+  });
 /**
  * Get Asset
  * Get an asset.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 475126ef506..f16ebde095f 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -115,6 +115,75 @@ export const useAssetServiceGetAssets = <
       }) as TData,
     ...options,
   });
+/**
+ * Get Asset Events
+ * Get asset events.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @param data.assetId
+ * @param data.sourceDagId
+ * @param data.sourceTaskId
+ * @param data.sourceRunId
+ * @param data.sourceMapIndex
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetAssetEvents = <
+  TData = Common.AssetServiceGetAssetEventsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    assetId,
+    limit,
+    offset,
+    orderBy,
+    sourceDagId,
+    sourceMapIndex,
+    sourceRunId,
+    sourceTaskId,
+  }: {
+    assetId?: number;
+    limit?: number;
+    offset?: number;
+    orderBy?: string;
+    sourceDagId?: string;
+    sourceMapIndex?: number;
+    sourceRunId?: string;
+    sourceTaskId?: string;
+  } = {},
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseAssetServiceGetAssetEventsKeyFn(
+      {
+        assetId,
+        limit,
+        offset,
+        orderBy,
+        sourceDagId,
+        sourceMapIndex,
+        sourceRunId,
+        sourceTaskId,
+      },
+      queryKey,
+    ),
+    queryFn: () =>
+      AssetService.getAssetEvents({
+        assetId,
+        limit,
+        offset,
+        orderBy,
+        sourceDagId,
+        sourceMapIndex,
+        sourceRunId,
+        sourceTaskId,
+      }) as TData,
+    ...options,
+  });
 /**
  * Get Asset
  * Get an asset.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 83b76e51127..e1d8d3f9d4e 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -100,6 +100,75 @@ export const useAssetServiceGetAssetsSuspense = <
       }) as TData,
     ...options,
   });
+/**
+ * Get Asset Events
+ * Get asset events.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @param data.assetId
+ * @param data.sourceDagId
+ * @param data.sourceTaskId
+ * @param data.sourceRunId
+ * @param data.sourceMapIndex
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetAssetEventsSuspense = <
+  TData = Common.AssetServiceGetAssetEventsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    assetId,
+    limit,
+    offset,
+    orderBy,
+    sourceDagId,
+    sourceMapIndex,
+    sourceRunId,
+    sourceTaskId,
+  }: {
+    assetId?: number;
+    limit?: number;
+    offset?: number;
+    orderBy?: string;
+    sourceDagId?: string;
+    sourceMapIndex?: number;
+    sourceRunId?: string;
+    sourceTaskId?: string;
+  } = {},
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseAssetServiceGetAssetEventsKeyFn(
+      {
+        assetId,
+        limit,
+        offset,
+        orderBy,
+        sourceDagId,
+        sourceMapIndex,
+        sourceRunId,
+        sourceTaskId,
+      },
+      queryKey,
+    ),
+    queryFn: () =>
+      AssetService.getAssetEvents({
+        assetId,
+        limit,
+        offset,
+        orderBy,
+        sourceDagId,
+        sourceMapIndex,
+        sourceRunId,
+        sourceTaskId,
+      }) as TData,
+    ...options,
+  });
 /**
  * Get Asset
  * Get an asset.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index b3339e78dad..7bf8f4b0296 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -127,6 +127,114 @@ export const $AssetCollectionResponse = {
   description: "Asset collection response.",
 } as const;
 
+export const $AssetEventCollectionResponse = {
+  properties: {
+    asset_events: {
+      items: {
+        $ref: "#/components/schemas/AssetEventResponse",
+      },
+      type: "array",
+      title: "Asset Events",
+    },
+    total_entries: {
+      type: "integer",
+      title: "Total Entries",
+    },
+  },
+  type: "object",
+  required: ["asset_events", "total_entries"],
+  title: "AssetEventCollectionResponse",
+  description: "Asset event collection response.",
+} as const;
+
+export const $AssetEventResponse = {
+  properties: {
+    id: {
+      type: "integer",
+      title: "Id",
+    },
+    asset_id: {
+      type: "integer",
+      title: "Asset Id",
+    },
+    uri: {
+      type: "string",
+      title: "Uri",
+    },
+    extra: {
+      anyOf: [
+        {
+          type: "object",
+        },
+        {
+          type: "null",
+        },
+      ],
+      title: "Extra",
+    },
+    source_task_id: {
+      anyOf: [
+        {
+          type: "string",
+        },
+        {
+          type: "null",
+        },
+      ],
+      title: "Source Task Id",
+    },
+    source_dag_id: {
+      anyOf: [
+        {
+          type: "string",
+        },
+        {
+          type: "null",
+        },
+      ],
+      title: "Source Dag Id",
+    },
+    source_run_id: {
+      anyOf: [
+        {
+          type: "string",
+        },
+        {
+          type: "null",
+        },
+      ],
+      title: "Source Run Id",
+    },
+    source_map_index: {
+      type: "integer",
+      title: "Source Map Index",
+    },
+    created_dagruns: {
+      items: {
+        $ref: "#/components/schemas/DagRunAssetReference",
+      },
+      type: "array",
+      title: "Created Dagruns",
+    },
+    timestamp: {
+      type: "string",
+      format: "date-time",
+      title: "Timestamp",
+    },
+  },
+  type: "object",
+  required: [
+    "id",
+    "asset_id",
+    "uri",
+    "source_map_index",
+    "created_dagruns",
+    "timestamp",
+  ],
+  title: "AssetEventResponse",
+  description: "Asset event serializer for responses.",
+} as const;
+
 export const $AssetResponse = {
   properties: {
     id: {
@@ -1795,6 +1903,61 @@ export const $DagProcessorInfoSchema = {
   description: "Schema for DagProcessor info.",
 } as const;
 
+export const $DagRunAssetReference = {
+  properties: {
+    run_id: {
+      type: "string",
+      title: "Run Id",
+    },
+    dag_id: {
+      type: "string",
+      title: "Dag Id",
+    },
+    logical_date: {
+      type: "string",
+      format: "date-time",
+      title: "Logical Date",
+    },
+    start_date: {
+      type: "string",
+      format: "date-time",
+      title: "Start Date",
+    },
+    end_date: {
+      type: "string",
+      format: "date-time",
+      title: "End Date",
+    },
+    state: {
+      type: "string",
+      title: "State",
+    },
+    data_interval_start: {
+      type: "string",
+      format: "date-time",
+      title: "Data Interval Start",
+    },
+    data_interval_end: {
+      type: "string",
+      format: "date-time",
+      title: "Data Interval End",
+    },
+  },
+  type: "object",
+  required: [
+    "run_id",
+    "dag_id",
+    "logical_date",
+    "start_date",
+    "end_date",
+    "state",
+    "data_interval_start",
+    "data_interval_end",
+  ],
+  title: "DagRunAssetReference",
+  description: "DAGRun serializer for asset responses.",
+} as const;
+
 export const $DagRunState = {
   type: "string",
   enum: ["queued", "running", "success", "failed"],
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 34fa7f24765..ee4ed1e4c41 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -7,6 +7,8 @@ import type {
   NextRunAssetsResponse,
   GetAssetsData,
   GetAssetsResponse,
+  GetAssetEventsData,
+  GetAssetEventsResponse,
   GetAssetData,
   GetAssetResponse,
   HistoricalMetricsData,
@@ -172,6 +174,46 @@ export class AssetService {
     });
   }
 
+  /**
+   * Get Asset Events
+   * Get asset events.
+   * @param data The data for the request.
+   * @param data.limit
+   * @param data.offset
+   * @param data.orderBy
+   * @param data.assetId
+   * @param data.sourceDagId
+   * @param data.sourceTaskId
+   * @param data.sourceRunId
+   * @param data.sourceMapIndex
+   * @returns AssetEventCollectionResponse Successful Response
+   * @throws ApiError
+   */
+  public static getAssetEvents(
+    data: GetAssetEventsData = {},
+  ): CancelablePromise<GetAssetEventsResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: "/public/assets/events",
+      query: {
+        limit: data.limit,
+        offset: data.offset,
+        order_by: data.orderBy,
+        asset_id: data.assetId,
+        source_dag_id: data.sourceDagId,
+        source_task_id: data.sourceTaskId,
+        source_run_id: data.sourceRunId,
+        source_map_index: data.sourceMapIndex,
+      },
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
+
   /**
    * Get Asset
    * Get an asset.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 6c036f76bf2..c4eee6b97c2 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -37,6 +37,32 @@ export type AssetCollectionResponse = {
   total_entries: number;
 };
 
+/**
+ * Asset event collection response.
+ */
+export type AssetEventCollectionResponse = {
+  asset_events: Array<AssetEventResponse>;
+  total_entries: number;
+};
+
+/**
+ * Asset event serializer for responses.
+ */
+export type AssetEventResponse = {
+  id: number;
+  asset_id: number;
+  uri: string;
+  extra?: {
+    [key: string]: unknown;
+  } | null;
+  source_task_id?: string | null;
+  source_dag_id?: string | null;
+  source_run_id?: string | null;
+  source_map_index: number;
+  created_dagruns: Array<DagRunAssetReference>;
+  timestamp: string;
+};
+
 /**
  * Asset serializer for responses.
  */
@@ -385,6 +411,20 @@ export type DagProcessorInfoSchema = {
   latest_dag_processor_heartbeat: string | null;
 };
 
+/**
+ * DAGRun serializer for asset responses.
+ */
+export type DagRunAssetReference = {
+  run_id: string;
+  dag_id: string;
+  logical_date: string;
+  start_date: string;
+  end_date: string;
+  state: string;
+  data_interval_start: string;
+  data_interval_end: string;
+};
+
 /**
  * All possible states that a DagRun can be in.
  *
@@ -931,6 +971,19 @@ export type GetAssetsData = {
 
 export type GetAssetsResponse = AssetCollectionResponse;
 
+export type GetAssetEventsData = {
+  assetId?: number | null;
+  limit?: number;
+  offset?: number;
+  orderBy?: string;
+  sourceDagId?: string | null;
+  sourceMapIndex?: number | null;
+  sourceRunId?: string | null;
+  sourceTaskId?: string | null;
+};
+
+export type GetAssetEventsResponse = AssetEventCollectionResponse;
+
 export type GetAssetData = {
   uri: string;
 };
@@ -1424,6 +1477,33 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/assets/events": {
+    get: {
+      req: GetAssetEventsData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: AssetEventCollectionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/assets/{uri}": {
     get: {
       req: GetAssetData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py 
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index eb72c1a99ac..95ad658ba4d 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -21,12 +21,15 @@ import urllib
 import pytest
 
 from airflow.models import DagModel
-from airflow.models.asset import AssetModel, DagScheduleAssetReference, 
TaskOutletAssetReference
+from airflow.models.asset import AssetEvent, AssetModel, 
DagScheduleAssetReference, TaskOutletAssetReference
+from airflow.models.dagrun import DagRun
 from airflow.utils import timezone
 from airflow.utils.session import provide_session
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.asserts import assert_queries_count
-from tests_common.test_utils.db import clear_db_assets
+from tests_common.test_utils.db import clear_db_assets, clear_db_runs
 
 pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
@@ -52,15 +55,70 @@ def _create_provided_asset(session, asset: AssetModel) -> 
None:
     session.commit()
 
 
+def _create_assets_events(session, num: int = 2) -> None:
+    default_time = "2020-06-11T18:00:00+00:00"
+    assets_events = [
+        AssetEvent(
+            id=i,
+            asset_id=i,
+            extra={"foo": "bar"},
+            source_task_id="source_task_id",
+            source_dag_id="source_dag_id",
+            source_run_id=f"source_run_id_{i}",
+            timestamp=timezone.parse(default_time),
+        )
+        for i in range(1, 1 + num)
+    ]
+    session.add_all(assets_events)
+    session.commit()
+
+
+def _create_provided_asset_event(session, asset_event: AssetEvent) -> None:
+    session.add(asset_event)
+    session.commit()
+
+
+def _create_dag_run(session, num: int = 2):
+    default_time = "2020-06-11T18:00:00+00:00"
+    dag_runs = [
+        DagRun(
+            dag_id="source_dag_id",
+            run_id=f"source_run_id_{i}",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(default_time),
+            start_date=timezone.parse(default_time),
+            data_interval=(timezone.parse(default_time), 
timezone.parse(default_time)),
+            external_trigger=True,
+            state=DagRunState.SUCCESS,
+        )
+        for i in range(1, 1 + num)
+    ]
+    for dag_run in dag_runs:
+        dag_run.end_date = timezone.parse(default_time)
+    session.add_all(dag_runs)
+    session.commit()
+
+
+def _create_asset_dag_run(session, num: int = 2):
+    for i in range(1, 1 + num):
+        dag_run = 
session.query(DagRun).filter_by(run_id=f"source_run_id_{i}").first()
+        asset_event = session.query(AssetEvent).filter_by(id=i).first()
+        if dag_run and asset_event:
+            dag_run.consumed_asset_events.append(asset_event)
+    session.commit()
+
+
 class TestAssets:
     default_time = "2020-06-11T18:00:00+00:00"
 
     @pytest.fixture(autouse=True)
     def setup(self) -> None:
         clear_db_assets()
+        clear_db_runs()
 
     def teardown_method(self) -> None:
         clear_db_assets()
+        clear_db_runs()
 
     @provide_session
     def create_assets(self, session, num: int = 2):
@@ -70,6 +128,22 @@ class TestAssets:
     def create_provided_asset(self, session, asset: AssetModel):
         _create_provided_asset(session=session, asset=asset)
 
+    @provide_session
+    def create_assets_events(self, session, num: int = 2):
+        _create_assets_events(session=session, num=num)
+
+    @provide_session
+    def create_provided_asset_event(self, session, asset_event: AssetEvent):
+        _create_provided_asset_event(session=session, asset_event=asset_event)
+
+    @provide_session
+    def create_dag_run(self, session, num: int = 2):
+        _create_dag_run(num=num, session=session)
+
+    @provide_session
+    def create_asset_dag_run(self, session, num: int = 2):
+        _create_asset_dag_run(num=num, session=session)
+
 
 class TestGetAssets(TestAssets):
     def test_should_respond_200(self, test_client, session):
@@ -234,6 +308,120 @@ class TestGetAssetsEndpointPagination(TestAssets):
         assert len(response.json()["assets"]) == 100
 
 
+class TestGetAssetEvents(TestAssets):
+    def test_should_respond_200(self, test_client, session):
+        self.create_assets()
+        self.create_assets_events()
+        self.create_dag_run()
+        self.create_asset_dag_run()
+        assets = session.query(AssetEvent).all()
+        assert len(assets) == 2
+        response = test_client.get("/public/assets/events")
+        assert response.status_code == 200
+        response_data = response.json()
+        assert response_data == {
+            "asset_events": [
+                {
+                    "id": 1,
+                    "asset_id": 1,
+                    "uri": "s3://bucket/key/1",
+                    "extra": {"foo": "bar"},
+                    "source_task_id": "source_task_id",
+                    "source_dag_id": "source_dag_id",
+                    "source_run_id": "source_run_id_1",
+                    "source_map_index": -1,
+                    "created_dagruns": [
+                        {
+                            "run_id": "source_run_id_1",
+                            "dag_id": "source_dag_id",
+                            "logical_date": "2020-06-11T18:00:00Z",
+                            "start_date": "2020-06-11T18:00:00Z",
+                            "end_date": "2020-06-11T18:00:00Z",
+                            "state": "success",
+                            "data_interval_start": "2020-06-11T18:00:00Z",
+                            "data_interval_end": "2020-06-11T18:00:00Z",
+                        }
+                    ],
+                    "timestamp": "2020-06-11T18:00:00Z",
+                },
+                {
+                    "id": 2,
+                    "asset_id": 2,
+                    "uri": "s3://bucket/key/2",
+                    "extra": {"foo": "bar"},
+                    "source_task_id": "source_task_id",
+                    "source_dag_id": "source_dag_id",
+                    "source_run_id": "source_run_id_2",
+                    "source_map_index": -1,
+                    "created_dagruns": [
+                        {
+                            "run_id": "source_run_id_2",
+                            "dag_id": "source_dag_id",
+                            "logical_date": "2020-06-11T18:00:00Z",
+                            "start_date": "2020-06-11T18:00:00Z",
+                            "end_date": "2020-06-11T18:00:00Z",
+                            "state": "success",
+                            "data_interval_start": "2020-06-11T18:00:00Z",
+                            "data_interval_end": "2020-06-11T18:00:00Z",
+                        }
+                    ],
+                    "timestamp": "2020-06-11T18:00:00Z",
+                },
+            ],
+            "total_entries": 2,
+        }
+
+    @pytest.mark.parametrize(
+        "params, total_entries",
+        [
+            ({"asset_id": "2"}, 1),
+            ({"source_dag_id": "source_dag_id"}, 2),
+            ({"source_task_id": "source_task_id"}, 2),
+            ({"source_run_id": "source_run_id_1"}, 1),
+            ({"source_map_index": "-1"}, 2),
+        ],
+    )
+    @provide_session
+    def test_filtering(self, test_client, params, total_entries, session):
+        self.create_assets()
+        self.create_assets_events()
+        self.create_dag_run()
+        self.create_asset_dag_run()
+        response = test_client.get("/public/assets/events", params=params)
+        assert response.status_code == 200
+        assert response.json()["total_entries"] == total_entries
+
+    def test_order_by_raises_400_for_invalid_attr(self, test_client, session):
+        response = test_client.get("/public/assets/events?order_by=fake")
+
+        assert response.status_code == 400
+        msg = "Ordering with 'fake' is disallowed or the attribute does not 
exist on the model"
+        assert response.json()["detail"] == msg
+
+    @pytest.mark.parametrize(
+        "params, expected_asset_uris",
+        [
+            # Limit test data
+            ({"limit": "1"}, ["s3://bucket/key/1"]),
+            ({"limit": "100"}, [f"s3://bucket/key/{i}" for i in range(1, 
101)]),
+            # Offset test data
+            ({"offset": "1"}, [f"s3://bucket/key/{i}" for i in range(2, 102)]),
+            ({"offset": "3"}, [f"s3://bucket/key/{i}" for i in range(4, 104)]),
+        ],
+    )
+    def test_limit_and_offset(self, test_client, params, expected_asset_uris):
+        self.create_assets(num=110)
+        self.create_assets_events(num=110)
+        self.create_dag_run(num=110)
+        self.create_asset_dag_run(num=110)
+
+        response = test_client.get("/public/assets/events", params=params)
+
+        assert response.status_code == 200
+        asset_uris = [asset["uri"] for asset in 
response.json()["asset_events"]]
+        assert asset_uris == expected_asset_uris
+
+
 class TestGetAssetEndpoint(TestAssets):
     @pytest.mark.parametrize(
         "url",


Reply via email to