This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9a364acc1b8 AIP 84: Migrate GET ASSET EVENTS legacy API to fast API
(#43881)
9a364acc1b8 is described below
commit 9a364acc1b860a0551eb2aa31b09b09d3a739fd8
Author: vatsrahul1001 <[email protected]>
AuthorDate: Thu Nov 14 15:29:53 2024 +0530
AIP 84: Migrate GET ASSET EVENTS legacy API to fast API (#43881)
* AIP-84: Migrating GET Assets to fastAPI
* matching response to legacy
* Adding unit tests - part 1
* Update airflow/api_fastapi/common/parameters.py
Co-authored-by: Jed Cunningham
<[email protected]>
* fixing the dag_ids filter
* fixing the dag_ids filter
* Adding unit tests - part 2
* fixing unit tests & updating parameter type
* review comments pierre
* fixing last commit
* fixing unit tests
* migrating get assets events endpoint to fastapi
* fixing test response
* Adding tests for filtering
* address review comments
* fixing test parametrize
* address review comments
* address review comments
* removing http 401 and 403 as its now added in root router in #43932
---------
Co-authored-by: Amogh <[email protected]>
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/api_connexion/endpoints/asset_endpoint.py | 1 +
airflow/api_fastapi/common/parameters.py | 97 +++++++++-
airflow/api_fastapi/core_api/datamodels/assets.py | 37 +++-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 212 +++++++++++++++++++++
.../api_fastapi/core_api/routes/public/assets.py | 64 ++++++-
airflow/ui/openapi-gen/queries/common.ts | 44 +++++
airflow/ui/openapi-gen/queries/prefetch.ts | 60 ++++++
airflow/ui/openapi-gen/queries/queries.ts | 69 +++++++
airflow/ui/openapi-gen/queries/suspense.ts | 69 +++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 163 ++++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 42 ++++
airflow/ui/openapi-gen/requests/types.gen.ts | 80 ++++++++
.../core_api/routes/public/test_assets.py | 192 ++++++++++++++++++-
13 files changed, 1124 insertions(+), 6 deletions(-)
diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py
b/airflow/api_connexion/endpoints/asset_endpoint.py
index 0c45ddd7095..83722520959 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -115,6 +115,7 @@ def get_assets(
return asset_collection_schema.dump(AssetCollection(assets=assets,
total_entries=total_entries))
+@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
@format_parameters({"limit": check_limit})
diff --git a/airflow/api_fastapi/common/parameters.py
b/airflow/api_fastapi/common/parameters.py
index c1d7624b37a..abf6378ac5b 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -29,7 +29,7 @@ from sqlalchemy.inspection import inspect
from airflow.api_connexion.endpoints.task_instance_endpoint import
_convert_ti_states
from airflow.models import Base, Connection
-from airflow.models.asset import AssetModel, DagScheduleAssetReference,
TaskOutletAssetReference
+from airflow.models.asset import AssetEvent, AssetModel,
DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
@@ -440,6 +440,86 @@ class _DagIdAssetReferenceFilter(BaseParam[list[str]]):
)
+class _AssetIdFilter(BaseParam[int]):
+ """Filter on asset_id."""
+
+ def __init__(self, attribute: ColumnElement, skip_none: bool = True) ->
None:
+ super().__init__(skip_none=skip_none)
+ self.attribute = attribute
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+ return select.where(self.attribute == self.value)
+
+ def depends(self, asset_id: int | None = None) -> _AssetIdFilter:
+ return self.set_value(asset_id)
+
+
+class _SourceDagIdFilter(BaseParam[str]):
+ """Filter on source_dag_id."""
+
+ def __init__(self, attribute: ColumnElement, skip_none: bool = True) ->
None:
+ super().__init__(skip_none=skip_none)
+ self.attribute = attribute
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+ return select.where(self.attribute == self.value)
+
+ def depends(self, source_dag_id: str | None = None) -> _SourceDagIdFilter:
+ return self.set_value(source_dag_id)
+
+
+class _SourceTaskIdFilter(BaseParam[str]):
+ """Filter on source_task_id."""
+
+ def __init__(self, attribute: ColumnElement, skip_none: bool = True) ->
None:
+ super().__init__(skip_none=skip_none)
+ self.attribute = attribute
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+ return select.where(self.attribute == self.value)
+
+ def depends(self, source_task_id: str | None = None) ->
_SourceTaskIdFilter:
+ return self.set_value(source_task_id)
+
+
+class _SourceRunIdFilter(BaseParam[str]):
+ """filter on source_run_id."""
+
+ def __init__(self, attribute: ColumnElement, skip_none: bool = True) ->
None:
+ super().__init__(skip_none=skip_none)
+ self.attribute = attribute
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+ return select.where(self.attribute == self.value)
+
+ def depends(self, source_run_id: str | None = None) -> _SourceRunIdFilter:
+ return self.set_value(source_run_id)
+
+
+class _SourceMapIndexFilter(BaseParam[int]):
+ """Filter on source_map_index."""
+
+ def __init__(self, attribute: ColumnElement, skip_none: bool = True) ->
None:
+ super().__init__(skip_none=skip_none)
+ self.attribute = attribute
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+ return select.where(self.attribute == self.value)
+
+ def depends(self, source_map_index: int | None = None) ->
_SourceMapIndexFilter:
+ return self.set_value(source_map_index)
+
+
class Range(BaseModel, Generic[T]):
"""Range with a lower and upper bound."""
@@ -537,3 +617,18 @@ QueryUriPatternSearch = Annotated[_UriPatternSearch,
Depends(_UriPatternSearch()
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
]
+QueryAssetIdFilter = Annotated[_AssetIdFilter,
Depends(_AssetIdFilter(AssetEvent.asset_id).depends)]
+
+
+QuerySourceDagIdFilter = Annotated[
+ _SourceDagIdFilter,
Depends(_SourceDagIdFilter(AssetEvent.source_dag_id).depends)
+]
+QuerySourceTaskIdFilter = Annotated[
+ _SourceTaskIdFilter,
Depends(_SourceTaskIdFilter(AssetEvent.source_task_id).depends)
+]
+QuerySourceRunIdFilter = Annotated[
+ _SourceRunIdFilter,
Depends(_SourceRunIdFilter(AssetEvent.source_run_id).depends)
+]
+QuerySourceMapIndexFilter = Annotated[
+ _SourceMapIndexFilter,
Depends(_SourceMapIndexFilter(AssetEvent.source_map_index).depends)
+]
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow/api_fastapi/core_api/datamodels/assets.py
index 498dc4edae6..85e41ff7b56 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from datetime import datetime
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
class DagScheduleAssetReference(BaseModel):
@@ -64,3 +64,38 @@ class AssetCollectionResponse(BaseModel):
assets: list[AssetResponse]
total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+ """DAGRun serializer for asset responses."""
+
+ run_id: str
+ dag_id: str
+ execution_date: datetime = Field(alias="logical_date")
+ start_date: datetime
+ end_date: datetime
+ state: str
+ data_interval_start: datetime
+ data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+ """Asset event serializer for responses."""
+
+ id: int
+ asset_id: int
+ uri: str
+ extra: dict | None = None
+ source_task_id: str | None = None
+ source_dag_id: str | None = None
+ source_run_id: str | None = None
+ source_map_index: int
+ created_dagruns: list[DagRunAssetReference]
+ timestamp: datetime
+
+
+class AssetEventCollectionResponse(BaseModel):
+ """Asset event collection response."""
+
+ asset_events: list[AssetEventResponse]
+ total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 6efa1e44585..b99b389de51 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -246,6 +246,106 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/assets/events:
+ get:
+ tags:
+ - Asset
+ summary: Get Asset Events
+ description: Get asset events.
+ operationId: get_asset_events
+ parameters:
+ - name: limit
+ in: query
+ required: false
+ schema:
+ type: integer
+ default: 100
+ title: Limit
+ - name: offset
+ in: query
+ required: false
+ schema:
+ type: integer
+ default: 0
+ title: Offset
+ - name: order_by
+ in: query
+ required: false
+ schema:
+ type: string
+ default: timestamp
+ title: Order By
+ - name: asset_id
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Asset Id
+ - name: source_dag_id
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Source Dag Id
+ - name: source_task_id
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Source Task Id
+ - name: source_run_id
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Source Run Id
+ - name: source_map_index
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Source Map Index
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AssetEventCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/assets/{uri}:
get:
tags:
@@ -3567,6 +3667,75 @@ components:
- total_entries
title: AssetCollectionResponse
description: Asset collection response.
+ AssetEventCollectionResponse:
+ properties:
+ asset_events:
+ items:
+ $ref: '#/components/schemas/AssetEventResponse'
+ type: array
+ title: Asset Events
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - asset_events
+ - total_entries
+ title: AssetEventCollectionResponse
+ description: Asset event collection response.
+ AssetEventResponse:
+ properties:
+ id:
+ type: integer
+ title: Id
+ asset_id:
+ type: integer
+ title: Asset Id
+ uri:
+ type: string
+ title: Uri
+ extra:
+ anyOf:
+ - type: object
+ - type: 'null'
+ title: Extra
+ source_task_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Source Task Id
+ source_dag_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Source Dag Id
+ source_run_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Source Run Id
+ source_map_index:
+ type: integer
+ title: Source Map Index
+ created_dagruns:
+ items:
+ $ref: '#/components/schemas/DagRunAssetReference'
+ type: array
+ title: Created Dagruns
+ timestamp:
+ type: string
+ format: date-time
+ title: Timestamp
+ type: object
+ required:
+ - id
+ - asset_id
+ - uri
+ - source_map_index
+ - created_dagruns
+ - timestamp
+ title: AssetEventResponse
+ description: Asset event serializer for responses.
AssetResponse:
properties:
id:
@@ -4631,6 +4800,49 @@ components:
- latest_dag_processor_heartbeat
title: DagProcessorInfoSchema
description: Schema for DagProcessor info.
+ DagRunAssetReference:
+ properties:
+ run_id:
+ type: string
+ title: Run Id
+ dag_id:
+ type: string
+ title: Dag Id
+ logical_date:
+ type: string
+ format: date-time
+ title: Logical Date
+ start_date:
+ type: string
+ format: date-time
+ title: Start Date
+ end_date:
+ type: string
+ format: date-time
+ title: End Date
+ state:
+ type: string
+ title: State
+ data_interval_start:
+ type: string
+ format: date-time
+ title: Data Interval Start
+ data_interval_end:
+ type: string
+ format: date-time
+ title: Data Interval End
+ type: object
+ required:
+ - run_id
+ - dag_id
+ - logical_date
+ - start_date
+ - end_date
+ - state
+ - data_interval_start
+ - data_interval_end
+ title: DagRunAssetReference
+ description: DAGRun serializer for asset responses.
DagRunState:
type: string
enum:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index d093eef3720..67218c47161 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -26,15 +26,25 @@ from sqlalchemy.orm import Session, joinedload, subqueryload
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
QueryAssetDagIdPatternSearch,
+ QueryAssetIdFilter,
QueryLimit,
QueryOffset,
+ QuerySourceDagIdFilter,
+ QuerySourceMapIndexFilter,
+ QuerySourceRunIdFilter,
+ QuerySourceTaskIdFilter,
QueryUriPatternSearch,
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.core_api.datamodels.assets import
AssetCollectionResponse, AssetResponse
+from airflow.api_fastapi.core_api.datamodels.assets import (
+ AssetCollectionResponse,
+ AssetEventCollectionResponse,
+ AssetEventResponse,
+ AssetResponse,
+)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
-from airflow.models.asset import AssetModel
+from airflow.models.asset import AssetEvent, AssetModel
assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")
@@ -74,6 +84,56 @@ def get_assets(
)
+@assets_router.get(
+ "/events",
+ responses=create_openapi_http_exception_doc([404]),
+)
+def get_asset_events(
+ limit: QueryLimit,
+ offset: QueryOffset,
+ order_by: Annotated[
+ SortParam,
+ Depends(
+ SortParam(
+ [
+ "source_task_id",
+ "source_dag_id",
+ "source_run_id",
+ "source_map_index",
+ "timestamp",
+ ],
+ AssetEvent,
+ ).dynamic_depends("timestamp")
+ ),
+ ],
+ asset_id: QueryAssetIdFilter,
+ source_dag_id: QuerySourceDagIdFilter,
+ source_task_id: QuerySourceTaskIdFilter,
+ source_run_id: QuerySourceRunIdFilter,
+ source_map_index: QuerySourceMapIndexFilter,
+ session: Annotated[Session, Depends(get_session)],
+) -> AssetEventCollectionResponse:
+ """Get asset events."""
+ assets_event_select, total_entries = paginated_select(
+ select(AssetEvent),
+ filters=[asset_id, source_dag_id, source_task_id, source_run_id,
source_map_index],
+ order_by=order_by,
+ offset=offset,
+ limit=limit,
+ session=session,
+ )
+
+ assets_event_select =
assets_event_select.options(subqueryload(AssetEvent.created_dagruns))
+ assets_events = session.scalars(assets_event_select).all()
+
+ return AssetEventCollectionResponse(
+ asset_events=[
+ AssetEventResponse.model_validate(asset, from_attributes=True) for
asset in assets_events
+ ],
+ total_entries=total_entries,
+ )
+
+
@assets_router.get(
"/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 1fff143182f..bc3cabf3792 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -69,6 +69,50 @@ export const UseAssetServiceGetAssetsKeyFn = (
useAssetServiceGetAssetsKey,
...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]),
];
+export type AssetServiceGetAssetEventsDefaultResponse = Awaited<
+ ReturnType<typeof AssetService.getAssetEvents>
+>;
+export type AssetServiceGetAssetEventsQueryResult<
+ TData = AssetServiceGetAssetEventsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useAssetServiceGetAssetEventsKey = "AssetServiceGetAssetEvents";
+export const UseAssetServiceGetAssetEventsKeyFn = (
+ {
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ }: {
+ assetId?: number;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ sourceDagId?: string;
+ sourceMapIndex?: number;
+ sourceRunId?: string;
+ sourceTaskId?: string;
+ } = {},
+ queryKey?: Array<unknown>,
+) => [
+ useAssetServiceGetAssetEventsKey,
+ ...(queryKey ?? [
+ {
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ },
+ ]),
+];
export type AssetServiceGetAssetDefaultResponse = Awaited<
ReturnType<typeof AssetService.getAsset>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index b822e35dd33..f5f120e5555 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -85,6 +85,66 @@ export const prefetchUseAssetServiceGetAssets = (
queryFn: () =>
AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }),
});
+/**
+ * Get Asset Events
+ * Get asset events.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @param data.assetId
+ * @param data.sourceDagId
+ * @param data.sourceTaskId
+ * @param data.sourceRunId
+ * @param data.sourceMapIndex
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseAssetServiceGetAssetEvents = (
+ queryClient: QueryClient,
+ {
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ }: {
+ assetId?: number;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ sourceDagId?: string;
+ sourceMapIndex?: number;
+ sourceRunId?: string;
+ sourceTaskId?: string;
+ } = {},
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseAssetServiceGetAssetEventsKeyFn({
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ }),
+ queryFn: () =>
+ AssetService.getAssetEvents({
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ }),
+ });
/**
* Get Asset
* Get an asset.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 475126ef506..f16ebde095f 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -115,6 +115,75 @@ export const useAssetServiceGetAssets = <
}) as TData,
...options,
});
+/**
+ * Get Asset Events
+ * Get asset events.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @param data.assetId
+ * @param data.sourceDagId
+ * @param data.sourceTaskId
+ * @param data.sourceRunId
+ * @param data.sourceMapIndex
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetAssetEvents = <
+ TData = Common.AssetServiceGetAssetEventsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ }: {
+ assetId?: number;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ sourceDagId?: string;
+ sourceMapIndex?: number;
+ sourceRunId?: string;
+ sourceTaskId?: string;
+ } = {},
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetAssetEventsKeyFn(
+ {
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ },
+ queryKey,
+ ),
+ queryFn: () =>
+ AssetService.getAssetEvents({
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ }) as TData,
+ ...options,
+ });
/**
* Get Asset
* Get an asset.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 83b76e51127..e1d8d3f9d4e 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -100,6 +100,75 @@ export const useAssetServiceGetAssetsSuspense = <
}) as TData,
...options,
});
+/**
+ * Get Asset Events
+ * Get asset events.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @param data.assetId
+ * @param data.sourceDagId
+ * @param data.sourceTaskId
+ * @param data.sourceRunId
+ * @param data.sourceMapIndex
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetAssetEventsSuspense = <
+ TData = Common.AssetServiceGetAssetEventsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ }: {
+ assetId?: number;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ sourceDagId?: string;
+ sourceMapIndex?: number;
+ sourceRunId?: string;
+ sourceTaskId?: string;
+ } = {},
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetAssetEventsKeyFn(
+ {
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ },
+ queryKey,
+ ),
+ queryFn: () =>
+ AssetService.getAssetEvents({
+ assetId,
+ limit,
+ offset,
+ orderBy,
+ sourceDagId,
+ sourceMapIndex,
+ sourceRunId,
+ sourceTaskId,
+ }) as TData,
+ ...options,
+ });
/**
* Get Asset
* Get an asset.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index b3339e78dad..7bf8f4b0296 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -127,6 +127,114 @@ export const $AssetCollectionResponse = {
description: "Asset collection response.",
} as const;
+export const $AssetEventCollectionResponse = {
+ properties: {
+ asset_events: {
+ items: {
+ $ref: "#/components/schemas/AssetEventResponse",
+ },
+ type: "array",
+ title: "Asset Events",
+ },
+ total_entries: {
+ type: "integer",
+ title: "Total Entries",
+ },
+ },
+ type: "object",
+ required: ["asset_events", "total_entries"],
+ title: "AssetEventCollectionResponse",
+ description: "Asset event collection response.",
+} as const;
+
+export const $AssetEventResponse = {
+ properties: {
+ id: {
+ type: "integer",
+ title: "Id",
+ },
+ asset_id: {
+ type: "integer",
+ title: "Asset Id",
+ },
+ uri: {
+ type: "string",
+ title: "Uri",
+ },
+ extra: {
+ anyOf: [
+ {
+ type: "object",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Extra",
+ },
+ source_task_id: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Source Task Id",
+ },
+ source_dag_id: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Source Dag Id",
+ },
+ source_run_id: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Source Run Id",
+ },
+ source_map_index: {
+ type: "integer",
+ title: "Source Map Index",
+ },
+ created_dagruns: {
+ items: {
+ $ref: "#/components/schemas/DagRunAssetReference",
+ },
+ type: "array",
+ title: "Created Dagruns",
+ },
+ timestamp: {
+ type: "string",
+ format: "date-time",
+ title: "Timestamp",
+ },
+ },
+ type: "object",
+ required: [
+ "id",
+ "asset_id",
+ "uri",
+ "source_map_index",
+ "created_dagruns",
+ "timestamp",
+ ],
+ title: "AssetEventResponse",
+ description: "Asset event serializer for responses.",
+} as const;
+
export const $AssetResponse = {
properties: {
id: {
@@ -1795,6 +1903,61 @@ export const $DagProcessorInfoSchema = {
description: "Schema for DagProcessor info.",
} as const;
+export const $DagRunAssetReference = {
+ properties: {
+ run_id: {
+ type: "string",
+ title: "Run Id",
+ },
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ logical_date: {
+ type: "string",
+ format: "date-time",
+ title: "Logical Date",
+ },
+ start_date: {
+ type: "string",
+ format: "date-time",
+ title: "Start Date",
+ },
+ end_date: {
+ type: "string",
+ format: "date-time",
+ title: "End Date",
+ },
+ state: {
+ type: "string",
+ title: "State",
+ },
+ data_interval_start: {
+ type: "string",
+ format: "date-time",
+ title: "Data Interval Start",
+ },
+ data_interval_end: {
+ type: "string",
+ format: "date-time",
+ title: "Data Interval End",
+ },
+ },
+ type: "object",
+ required: [
+ "run_id",
+ "dag_id",
+ "logical_date",
+ "start_date",
+ "end_date",
+ "state",
+ "data_interval_start",
+ "data_interval_end",
+ ],
+ title: "DagRunAssetReference",
+ description: "DAGRun serializer for asset responses.",
+} as const;
+
export const $DagRunState = {
type: "string",
enum: ["queued", "running", "success", "failed"],
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 34fa7f24765..ee4ed1e4c41 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -7,6 +7,8 @@ import type {
NextRunAssetsResponse,
GetAssetsData,
GetAssetsResponse,
+ GetAssetEventsData,
+ GetAssetEventsResponse,
GetAssetData,
GetAssetResponse,
HistoricalMetricsData,
@@ -172,6 +174,46 @@ export class AssetService {
});
}
+ /**
+ * Get Asset Events
+ * Get asset events.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @param data.assetId
+ * @param data.sourceDagId
+ * @param data.sourceTaskId
+ * @param data.sourceRunId
+ * @param data.sourceMapIndex
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getAssetEvents(
+ data: GetAssetEventsData = {},
+ ): CancelablePromise<GetAssetEventsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/assets/events",
+ query: {
+ limit: data.limit,
+ offset: data.offset,
+ order_by: data.orderBy,
+ asset_id: data.assetId,
+ source_dag_id: data.sourceDagId,
+ source_task_id: data.sourceTaskId,
+ source_run_id: data.sourceRunId,
+ source_map_index: data.sourceMapIndex,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
/**
* Get Asset
* Get an asset.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 6c036f76bf2..c4eee6b97c2 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -37,6 +37,32 @@ export type AssetCollectionResponse = {
total_entries: number;
};
+/**
+ * Asset event collection response.
+ */
+export type AssetEventCollectionResponse = {
+ asset_events: Array<AssetEventResponse>;
+ total_entries: number;
+};
+
+/**
+ * Asset event serializer for responses.
+ */
+export type AssetEventResponse = {
+ id: number;
+ asset_id: number;
+ uri: string;
+ extra?: {
+ [key: string]: unknown;
+ } | null;
+ source_task_id?: string | null;
+ source_dag_id?: string | null;
+ source_run_id?: string | null;
+ source_map_index: number;
+ created_dagruns: Array<DagRunAssetReference>;
+ timestamp: string;
+};
+
/**
* Asset serializer for responses.
*/
@@ -385,6 +411,20 @@ export type DagProcessorInfoSchema = {
latest_dag_processor_heartbeat: string | null;
};
+/**
+ * DAGRun serializer for asset responses.
+ */
+export type DagRunAssetReference = {
+ run_id: string;
+ dag_id: string;
+ logical_date: string;
+ start_date: string;
+ end_date: string;
+ state: string;
+ data_interval_start: string;
+ data_interval_end: string;
+};
+
/**
* All possible states that a DagRun can be in.
*
@@ -931,6 +971,19 @@ export type GetAssetsData = {
export type GetAssetsResponse = AssetCollectionResponse;
+export type GetAssetEventsData = {
+ assetId?: number | null;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ sourceDagId?: string | null;
+ sourceMapIndex?: number | null;
+ sourceRunId?: string | null;
+ sourceTaskId?: string | null;
+};
+
+export type GetAssetEventsResponse = AssetEventCollectionResponse;
+
export type GetAssetData = {
uri: string;
};
@@ -1424,6 +1477,33 @@ export type $OpenApiTs = {
};
};
};
+ "/public/assets/events": {
+ get: {
+ req: GetAssetEventsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: AssetEventCollectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/assets/{uri}": {
get: {
req: GetAssetData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index eb72c1a99ac..95ad658ba4d 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -21,12 +21,15 @@ import urllib
import pytest
from airflow.models import DagModel
-from airflow.models.asset import AssetModel, DagScheduleAssetReference,
TaskOutletAssetReference
+from airflow.models.asset import AssetEvent, AssetModel,
DagScheduleAssetReference, TaskOutletAssetReference
+from airflow.models.dagrun import DagRun
from airflow.utils import timezone
from airflow.utils.session import provide_session
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunType
from tests_common.test_utils.asserts import assert_queries_count
-from tests_common.test_utils.db import clear_db_assets
+from tests_common.test_utils.db import clear_db_assets, clear_db_runs
pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
@@ -52,15 +55,70 @@ def _create_provided_asset(session, asset: AssetModel) ->
None:
session.commit()
+def _create_assets_events(session, num: int = 2) -> None:
+ default_time = "2020-06-11T18:00:00+00:00"
+ assets_events = [
+ AssetEvent(
+ id=i,
+ asset_id=i,
+ extra={"foo": "bar"},
+ source_task_id="source_task_id",
+ source_dag_id="source_dag_id",
+ source_run_id=f"source_run_id_{i}",
+ timestamp=timezone.parse(default_time),
+ )
+ for i in range(1, 1 + num)
+ ]
+ session.add_all(assets_events)
+ session.commit()
+
+
+def _create_provided_asset_event(session, asset_event: AssetEvent) -> None:
+ session.add(asset_event)
+ session.commit()
+
+
+def _create_dag_run(session, num: int = 2):
+ default_time = "2020-06-11T18:00:00+00:00"
+ dag_runs = [
+ DagRun(
+ dag_id="source_dag_id",
+ run_id=f"source_run_id_{i}",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(default_time),
+ start_date=timezone.parse(default_time),
+ data_interval=(timezone.parse(default_time),
timezone.parse(default_time)),
+ external_trigger=True,
+ state=DagRunState.SUCCESS,
+ )
+ for i in range(1, 1 + num)
+ ]
+ for dag_run in dag_runs:
+ dag_run.end_date = timezone.parse(default_time)
+ session.add_all(dag_runs)
+ session.commit()
+
+
+def _create_asset_dag_run(session, num: int = 2):
+ for i in range(1, 1 + num):
+ dag_run =
session.query(DagRun).filter_by(run_id=f"source_run_id_{i}").first()
+ asset_event = session.query(AssetEvent).filter_by(id=i).first()
+ if dag_run and asset_event:
+ dag_run.consumed_asset_events.append(asset_event)
+ session.commit()
+
+
class TestAssets:
default_time = "2020-06-11T18:00:00+00:00"
@pytest.fixture(autouse=True)
def setup(self) -> None:
clear_db_assets()
+ clear_db_runs()
def teardown_method(self) -> None:
clear_db_assets()
+ clear_db_runs()
@provide_session
def create_assets(self, session, num: int = 2):
@@ -70,6 +128,22 @@ class TestAssets:
def create_provided_asset(self, session, asset: AssetModel):
_create_provided_asset(session=session, asset=asset)
+ @provide_session
+ def create_assets_events(self, session, num: int = 2):
+ _create_assets_events(session=session, num=num)
+
+ @provide_session
+ def create_provided_asset_event(self, session, asset_event: AssetEvent):
+ _create_provided_asset_event(session=session, asset_event=asset_event)
+
+ @provide_session
+ def create_dag_run(self, session, num: int = 2):
+ _create_dag_run(num=num, session=session)
+
+ @provide_session
+ def create_asset_dag_run(self, session, num: int = 2):
+ _create_asset_dag_run(num=num, session=session)
+
class TestGetAssets(TestAssets):
def test_should_respond_200(self, test_client, session):
@@ -234,6 +308,120 @@ class TestGetAssetsEndpointPagination(TestAssets):
assert len(response.json()["assets"]) == 100
+class TestGetAssetEvents(TestAssets):
+ def test_should_respond_200(self, test_client, session):
+ self.create_assets()
+ self.create_assets_events()
+ self.create_dag_run()
+ self.create_asset_dag_run()
+ assets = session.query(AssetEvent).all()
+ assert len(assets) == 2
+ response = test_client.get("/public/assets/events")
+ assert response.status_code == 200
+ response_data = response.json()
+ assert response_data == {
+ "asset_events": [
+ {
+ "id": 1,
+ "asset_id": 1,
+ "uri": "s3://bucket/key/1",
+ "extra": {"foo": "bar"},
+ "source_task_id": "source_task_id",
+ "source_dag_id": "source_dag_id",
+ "source_run_id": "source_run_id_1",
+ "source_map_index": -1,
+ "created_dagruns": [
+ {
+ "run_id": "source_run_id_1",
+ "dag_id": "source_dag_id",
+ "logical_date": "2020-06-11T18:00:00Z",
+ "start_date": "2020-06-11T18:00:00Z",
+ "end_date": "2020-06-11T18:00:00Z",
+ "state": "success",
+ "data_interval_start": "2020-06-11T18:00:00Z",
+ "data_interval_end": "2020-06-11T18:00:00Z",
+ }
+ ],
+ "timestamp": "2020-06-11T18:00:00Z",
+ },
+ {
+ "id": 2,
+ "asset_id": 2,
+ "uri": "s3://bucket/key/2",
+ "extra": {"foo": "bar"},
+ "source_task_id": "source_task_id",
+ "source_dag_id": "source_dag_id",
+ "source_run_id": "source_run_id_2",
+ "source_map_index": -1,
+ "created_dagruns": [
+ {
+ "run_id": "source_run_id_2",
+ "dag_id": "source_dag_id",
+ "logical_date": "2020-06-11T18:00:00Z",
+ "start_date": "2020-06-11T18:00:00Z",
+ "end_date": "2020-06-11T18:00:00Z",
+ "state": "success",
+ "data_interval_start": "2020-06-11T18:00:00Z",
+ "data_interval_end": "2020-06-11T18:00:00Z",
+ }
+ ],
+ "timestamp": "2020-06-11T18:00:00Z",
+ },
+ ],
+ "total_entries": 2,
+ }
+
+ @pytest.mark.parametrize(
+ "params, total_entries",
+ [
+ ({"asset_id": "2"}, 1),
+ ({"source_dag_id": "source_dag_id"}, 2),
+ ({"source_task_id": "source_task_id"}, 2),
+ ({"source_run_id": "source_run_id_1"}, 1),
+ ({"source_map_index": "-1"}, 2),
+ ],
+ )
+ @provide_session
+ def test_filtering(self, test_client, params, total_entries, session):
+ self.create_assets()
+ self.create_assets_events()
+ self.create_dag_run()
+ self.create_asset_dag_run()
+ response = test_client.get("/public/assets/events", params=params)
+ assert response.status_code == 200
+ assert response.json()["total_entries"] == total_entries
+
+ def test_order_by_raises_400_for_invalid_attr(self, test_client, session):
+ response = test_client.get("/public/assets/events?order_by=fake")
+
+ assert response.status_code == 400
+ msg = "Ordering with 'fake' is disallowed or the attribute does not
exist on the model"
+ assert response.json()["detail"] == msg
+
+ @pytest.mark.parametrize(
+ "params, expected_asset_uris",
+ [
+ # Limit test data
+ ({"limit": "1"}, ["s3://bucket/key/1"]),
+ ({"limit": "100"}, [f"s3://bucket/key/{i}" for i in range(1,
101)]),
+ # Offset test data
+ ({"offset": "1"}, [f"s3://bucket/key/{i}" for i in range(2, 102)]),
+ ({"offset": "3"}, [f"s3://bucket/key/{i}" for i in range(4, 104)]),
+ ],
+ )
+ def test_limit_and_offset(self, test_client, params, expected_asset_uris):
+ self.create_assets(num=110)
+ self.create_assets_events(num=110)
+ self.create_dag_run(num=110)
+ self.create_asset_dag_run(num=110)
+
+ response = test_client.get("/public/assets/events", params=params)
+
+ assert response.status_code == 200
+ asset_uris = [asset["uri"] for asset in
response.json()["asset_events"]]
+ assert asset_uris == expected_asset_uris
+
+
class TestGetAssetEndpoint(TestAssets):
@pytest.mark.parametrize(
"url",