This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 238b2093f64 Implement asset alias list endpoint in FastAPI (#44777)
238b2093f64 is described below
commit 238b2093f64777ec0d1bbb22de495116d7260fa4
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Dec 10 05:11:19 2024 +0800
Implement asset alias list endpoint in FastAPI (#44777)
A new filter name_pattern is also added to the asset list endpoint (and
asset alias list).
---
airflow/api_fastapi/common/parameters.py | 13 +-
airflow/api_fastapi/core_api/datamodels/assets.py | 25 ++--
.../api_fastapi/core_api/openapi/v1-generated.yaml | 102 +++++++++++++-
.../api_fastapi/core_api/routes/public/assets.py | 40 +++++-
.../cli/commands/remote_commands/asset_command.py | 6 +-
airflow/ui/openapi-gen/queries/common.ts | 31 ++++-
airflow/ui/openapi-gen/queries/prefetch.ts | 48 ++++++-
airflow/ui/openapi-gen/queries/queries.ts | 50 ++++++-
airflow/ui/openapi-gen/queries/suspense.ts | 50 ++++++-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 28 +++-
airflow/ui/openapi-gen/requests/services.gen.ts | 36 +++++
airflow/ui/openapi-gen/requests/types.gen.ts | 51 ++++++-
.../core_api/routes/public/test_assets.py | 154 +++++++++++++++++++++
13 files changed, 603 insertions(+), 31 deletions(-)
diff --git a/airflow/api_fastapi/common/parameters.py
b/airflow/api_fastapi/common/parameters.py
index 7d5e4c63416..dbaf4581ac1 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -40,7 +40,12 @@ from sqlalchemy import Column, case, or_
from sqlalchemy.inspection import inspect
from airflow.models import Base
-from airflow.models.asset import AssetModel, DagScheduleAssetReference,
TaskOutletAssetReference
+from airflow.models.asset import (
+ AssetAliasModel,
+ AssetModel,
+ DagScheduleAssetReference,
+ TaskOutletAssetReference,
+)
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
@@ -568,7 +573,13 @@ QueryTIExecutorFilter = Annotated[
]
# Assets
+QueryAssetNamePatternSearch = Annotated[
+ _SearchParam, Depends(search_param_factory(AssetModel.name,
"name_pattern"))
+]
QueryUriPatternSearch = Annotated[_SearchParam,
Depends(search_param_factory(AssetModel.uri, "uri_pattern"))]
+QueryAssetAliasNamePatternSearch = Annotated[
+ _SearchParam, Depends(search_param_factory(AssetAliasModel.name,
"name_pattern"))
+]
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
]
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow/api_fastapi/core_api/datamodels/assets.py
index 72bba200fab..047f3f557bc 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -42,14 +42,6 @@ class TaskOutletAssetReference(BaseModel):
updated_at: datetime
-class AssetAliasSchema(BaseModel):
- """Asset alias serializer for assets."""
-
- id: int
- name: str
- group: str
-
-
class AssetResponse(BaseModel):
"""Asset serializer for responses."""
@@ -62,7 +54,7 @@ class AssetResponse(BaseModel):
updated_at: datetime
consuming_dags: list[DagScheduleAssetReference]
producing_tasks: list[TaskOutletAssetReference]
- aliases: list[AssetAliasSchema]
+ aliases: list[AssetAliasResponse]
@field_validator("extra", mode="after")
@classmethod
@@ -77,6 +69,21 @@ class AssetCollectionResponse(BaseModel):
total_entries: int
+class AssetAliasResponse(BaseModel):
+ """Asset alias serializer for responses."""
+
+ id: int
+ name: str
+ group: str
+
+
+class AssetAliasCollectionResponse(BaseModel):
+ """Asset alias collection response."""
+
+ asset_aliases: list[AssetAliasResponse]
+ total_entries: int
+
+
class DagRunAssetReference(BaseModel):
"""DAGRun serializer for asset responses."""
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index b877e8988a1..ba1dceab659 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -273,6 +273,14 @@ paths:
minimum: 0
default: 0
title: Offset
+ - name: name_pattern
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Name Pattern
- name: uri_pattern
in: query
required: false
@@ -327,6 +335,76 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/assets/aliases:
+ get:
+ tags:
+ - Asset
+ summary: Get Asset Aliases
+ description: Get asset aliases.
+ operationId: get_asset_aliases
+ parameters:
+ - name: limit
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 100
+ title: Limit
+ - name: offset
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 0
+ title: Offset
+ - name: name_pattern
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Name Pattern
+ - name: order_by
+ in: query
+ required: false
+ schema:
+ type: string
+ default: id
+ title: Order By
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AssetAliasCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/assets/events:
get:
tags:
@@ -5778,7 +5856,23 @@ components:
type: object
title: AppBuilderViewResponse
description: Serializer for AppBuilder View responses.
- AssetAliasSchema:
+ AssetAliasCollectionResponse:
+ properties:
+ asset_aliases:
+ items:
+ $ref: '#/components/schemas/AssetAliasResponse'
+ type: array
+ title: Asset Aliases
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - asset_aliases
+ - total_entries
+ title: AssetAliasCollectionResponse
+ description: Asset alias collection response.
+ AssetAliasResponse:
properties:
id:
type: integer
@@ -5794,8 +5888,8 @@ components:
- id
- name
- group
- title: AssetAliasSchema
- description: Asset alias serializer for assets.
+ title: AssetAliasResponse
+ description: Asset alias serializer for responses.
AssetCollectionResponse:
properties:
assets:
@@ -5920,7 +6014,7 @@ components:
title: Producing Tasks
aliases:
items:
- $ref: '#/components/schemas/AssetAliasSchema'
+ $ref: '#/components/schemas/AssetAliasResponse'
type: array
title: Aliases
type: object
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 3258c4180d9..552539c0cb6 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -28,7 +28,9 @@ from airflow.api_fastapi.common.db.common import SessionDep,
paginated_select
from airflow.api_fastapi.common.parameters import (
FilterParam,
OptionalDateTimeQuery,
+ QueryAssetAliasNamePatternSearch,
QueryAssetDagIdPatternSearch,
+ QueryAssetNamePatternSearch,
QueryLimit,
QueryOffset,
QueryUriPatternSearch,
@@ -37,6 +39,7 @@ from airflow.api_fastapi.common.parameters import (
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.assets import (
+ AssetAliasCollectionResponse,
AssetCollectionResponse,
AssetEventCollectionResponse,
AssetEventResponse,
@@ -47,7 +50,7 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.assets.manager import asset_manager
-from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
+from airflow.models.asset import AssetAliasModel, AssetDagRunQueue,
AssetEvent, AssetModel
from airflow.utils import timezone
assets_router = AirflowRouter(tags=["Asset"])
@@ -81,18 +84,19 @@ def _generate_queued_event_where_clause(
def get_assets(
limit: QueryLimit,
offset: QueryOffset,
+ name_pattern: QueryAssetNamePatternSearch,
uri_pattern: QueryUriPatternSearch,
dag_ids: QueryAssetDagIdPatternSearch,
order_by: Annotated[
SortParam,
- Depends(SortParam(["id", "uri", "created_at", "updated_at"],
AssetModel).dynamic_depends()),
+ Depends(SortParam(["id", "name", "uri", "created_at", "updated_at"],
AssetModel).dynamic_depends()),
],
session: SessionDep,
) -> AssetCollectionResponse:
"""Get assets."""
assets_select, total_entries = paginated_select(
statement=select(AssetModel),
- filters=[uri_pattern, dag_ids],
+ filters=[name_pattern, uri_pattern, dag_ids],
order_by=order_by,
offset=offset,
limit=limit,
@@ -110,6 +114,36 @@ def get_assets(
)
+@assets_router.get(
+ "/assets/aliases",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+)
+def get_asset_aliases(
+ limit: QueryLimit,
+ offset: QueryOffset,
+ name_pattern: QueryAssetAliasNamePatternSearch,
+ order_by: Annotated[
+ SortParam,
+ Depends(SortParam(["id", "name"], AssetAliasModel).dynamic_depends()),
+ ],
+ session: SessionDep,
+) -> AssetAliasCollectionResponse:
+ """Get asset aliases."""
+ asset_aliases_select, total_entries = paginated_select(
+ statement=select(AssetAliasModel),
+ filters=[name_pattern],
+ order_by=order_by,
+ offset=offset,
+ limit=limit,
+ session=session,
+ )
+
+ return AssetAliasCollectionResponse(
+ asset_aliases=session.scalars(asset_aliases_select),
+ total_entries=total_entries,
+ )
+
+
@assets_router.get(
"/assets/events",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
diff --git a/airflow/cli/commands/remote_commands/asset_command.py
b/airflow/cli/commands/remote_commands/asset_command.py
index 12d8516ff62..02fc9f7bd98 100644
--- a/airflow/cli/commands/remote_commands/asset_command.py
+++ b/airflow/cli/commands/remote_commands/asset_command.py
@@ -23,7 +23,7 @@ import typing
from sqlalchemy import select
from airflow.api.common.trigger_dag import trigger_dag
-from airflow.api_fastapi.core_api.datamodels.assets import AssetAliasSchema,
AssetResponse
+from airflow.api_fastapi.core_api.datamodels.assets import AssetAliasResponse,
AssetResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.cli.simple_table import AirflowConsole
from airflow.models.asset import AssetAliasModel, AssetModel,
TaskOutletAssetReference
@@ -43,7 +43,7 @@ log = logging.getLogger(__name__)
def _list_asset_aliases(args, *, session: Session) -> tuple[Any,
type[BaseModel]]:
aliases =
session.scalars(select(AssetAliasModel).order_by(AssetAliasModel.name))
- return aliases, AssetAliasSchema
+ return aliases, AssetAliasResponse
def _list_assets(args, *, session: Session) -> tuple[Any, type[BaseModel]]:
@@ -77,7 +77,7 @@ def _detail_asset_alias(args, *, session: Session) ->
BaseModel:
if alias is None:
raise SystemExit(f"Asset alias with name {args.name} does not exist.")
- return AssetAliasSchema.model_validate(alias)
+ return AssetAliasResponse.model_validate(alias)
def _detail_asset(args, *, session: Session) -> BaseModel:
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index c8bc4d06729..2a1c59685bf 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -59,12 +59,14 @@ export const UseAssetServiceGetAssetsKeyFn = (
{
dagIds,
limit,
+ namePattern,
offset,
orderBy,
uriPattern,
}: {
dagIds?: string[];
limit?: number;
+ namePattern?: string;
offset?: number;
orderBy?: string;
uriPattern?: string;
@@ -72,7 +74,34 @@ export const UseAssetServiceGetAssetsKeyFn = (
queryKey?: Array<unknown>,
) => [
useAssetServiceGetAssetsKey,
- ...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]),
+ ...(queryKey ?? [
+ { dagIds, limit, namePattern, offset, orderBy, uriPattern },
+ ]),
+];
+export type AssetServiceGetAssetAliasesDefaultResponse = Awaited<
+ ReturnType<typeof AssetService.getAssetAliases>
+>;
+export type AssetServiceGetAssetAliasesQueryResult<
+ TData = AssetServiceGetAssetAliasesDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useAssetServiceGetAssetAliasesKey = "AssetServiceGetAssetAliases";
+export const UseAssetServiceGetAssetAliasesKeyFn = (
+ {
+ limit,
+ namePattern,
+ offset,
+ orderBy,
+ }: {
+ limit?: number;
+ namePattern?: string;
+ offset?: number;
+ orderBy?: string;
+ } = {},
+ queryKey?: Array<unknown>,
+) => [
+ useAssetServiceGetAssetAliasesKey,
+ ...(queryKey ?? [{ limit, namePattern, offset, orderBy }]),
];
export type AssetServiceGetAssetEventsDefaultResponse = Awaited<
ReturnType<typeof AssetService.getAssetEvents>
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 40e7006e8f8..8e0cff0ae12 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -56,6 +56,7 @@ export const prefetchUseAssetServiceNextRunAssets = (
* @param data The data for the request.
* @param data.limit
* @param data.offset
+ * @param data.namePattern
* @param data.uriPattern
* @param data.dagIds
* @param data.orderBy
@@ -67,12 +68,14 @@ export const prefetchUseAssetServiceGetAssets = (
{
dagIds,
limit,
+ namePattern,
offset,
orderBy,
uriPattern,
}: {
dagIds?: string[];
limit?: number;
+ namePattern?: string;
offset?: number;
orderBy?: string;
uriPattern?: string;
@@ -82,12 +85,55 @@ export const prefetchUseAssetServiceGetAssets = (
queryKey: Common.UseAssetServiceGetAssetsKeyFn({
dagIds,
limit,
+ namePattern,
offset,
orderBy,
uriPattern,
}),
queryFn: () =>
- AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }),
+ AssetService.getAssets({
+ dagIds,
+ limit,
+ namePattern,
+ offset,
+ orderBy,
+ uriPattern,
+ }),
+ });
+/**
+ * Get Asset Aliases
+ * Get asset aliases.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.namePattern
+ * @param data.orderBy
+ * @returns AssetAliasCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseAssetServiceGetAssetAliases = (
+ queryClient: QueryClient,
+ {
+ limit,
+ namePattern,
+ offset,
+ orderBy,
+ }: {
+ limit?: number;
+ namePattern?: string;
+ offset?: number;
+ orderBy?: string;
+ } = {},
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseAssetServiceGetAssetAliasesKeyFn({
+ limit,
+ namePattern,
+ offset,
+ orderBy,
+ }),
+ queryFn: () =>
+ AssetService.getAssetAliases({ limit, namePattern, offset, orderBy }),
});
/**
* Get Asset Events
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 624b9c38be8..a9aff498f06 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -87,6 +87,7 @@ export const useAssetServiceNextRunAssets = <
* @param data The data for the request.
* @param data.limit
* @param data.offset
+ * @param data.namePattern
* @param data.uriPattern
* @param data.dagIds
* @param data.orderBy
@@ -101,12 +102,14 @@ export const useAssetServiceGetAssets = <
{
dagIds,
limit,
+ namePattern,
offset,
orderBy,
uriPattern,
}: {
dagIds?: string[];
limit?: number;
+ namePattern?: string;
offset?: number;
orderBy?: string;
uriPattern?: string;
@@ -116,19 +119,64 @@ export const useAssetServiceGetAssets = <
) =>
useQuery<TData, TError>({
queryKey: Common.UseAssetServiceGetAssetsKeyFn(
- { dagIds, limit, offset, orderBy, uriPattern },
+ { dagIds, limit, namePattern, offset, orderBy, uriPattern },
queryKey,
),
queryFn: () =>
AssetService.getAssets({
dagIds,
limit,
+ namePattern,
offset,
orderBy,
uriPattern,
}) as TData,
...options,
});
+/**
+ * Get Asset Aliases
+ * Get asset aliases.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.namePattern
+ * @param data.orderBy
+ * @returns AssetAliasCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetAssetAliases = <
+ TData = Common.AssetServiceGetAssetAliasesDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ limit,
+ namePattern,
+ offset,
+ orderBy,
+ }: {
+ limit?: number;
+ namePattern?: string;
+ offset?: number;
+ orderBy?: string;
+ } = {},
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetAssetAliasesKeyFn(
+ { limit, namePattern, offset, orderBy },
+ queryKey,
+ ),
+ queryFn: () =>
+ AssetService.getAssetAliases({
+ limit,
+ namePattern,
+ offset,
+ orderBy,
+ }) as TData,
+ ...options,
+ });
/**
* Get Asset Events
* Get asset events.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 250b2038a8c..f9e87f7c16a 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -62,6 +62,7 @@ export const useAssetServiceNextRunAssetsSuspense = <
* @param data The data for the request.
* @param data.limit
* @param data.offset
+ * @param data.namePattern
* @param data.uriPattern
* @param data.dagIds
* @param data.orderBy
@@ -76,12 +77,14 @@ export const useAssetServiceGetAssetsSuspense = <
{
dagIds,
limit,
+ namePattern,
offset,
orderBy,
uriPattern,
}: {
dagIds?: string[];
limit?: number;
+ namePattern?: string;
offset?: number;
orderBy?: string;
uriPattern?: string;
@@ -91,19 +94,64 @@ export const useAssetServiceGetAssetsSuspense = <
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseAssetServiceGetAssetsKeyFn(
- { dagIds, limit, offset, orderBy, uriPattern },
+ { dagIds, limit, namePattern, offset, orderBy, uriPattern },
queryKey,
),
queryFn: () =>
AssetService.getAssets({
dagIds,
limit,
+ namePattern,
offset,
orderBy,
uriPattern,
}) as TData,
...options,
});
+/**
+ * Get Asset Aliases
+ * Get asset aliases.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.namePattern
+ * @param data.orderBy
+ * @returns AssetAliasCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetAssetAliasesSuspense = <
+ TData = Common.AssetServiceGetAssetAliasesDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ limit,
+ namePattern,
+ offset,
+ orderBy,
+ }: {
+ limit?: number;
+ namePattern?: string;
+ offset?: number;
+ orderBy?: string;
+ } = {},
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetAssetAliasesKeyFn(
+ { limit, namePattern, offset, orderBy },
+ queryKey,
+ ),
+ queryFn: () =>
+ AssetService.getAssetAliases({
+ limit,
+ namePattern,
+ offset,
+ orderBy,
+ }) as TData,
+ ...options,
+ });
/**
* Get Asset Events
* Get asset events.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index c745b04fbd0..622ee94d697 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -89,7 +89,27 @@ export const $AppBuilderViewResponse = {
description: "Serializer for AppBuilder View responses.",
} as const;
-export const $AssetAliasSchema = {
+export const $AssetAliasCollectionResponse = {
+ properties: {
+ asset_aliases: {
+ items: {
+ $ref: "#/components/schemas/AssetAliasResponse",
+ },
+ type: "array",
+ title: "Asset Aliases",
+ },
+ total_entries: {
+ type: "integer",
+ title: "Total Entries",
+ },
+ },
+ type: "object",
+ required: ["asset_aliases", "total_entries"],
+ title: "AssetAliasCollectionResponse",
+ description: "Asset alias collection response.",
+} as const;
+
+export const $AssetAliasResponse = {
properties: {
id: {
type: "integer",
@@ -106,8 +126,8 @@ export const $AssetAliasSchema = {
},
type: "object",
required: ["id", "name", "group"],
- title: "AssetAliasSchema",
- description: "Asset alias serializer for assets.",
+ title: "AssetAliasResponse",
+ description: "Asset alias serializer for responses.",
} as const;
export const $AssetCollectionResponse = {
@@ -293,7 +313,7 @@ export const $AssetResponse = {
},
aliases: {
items: {
- $ref: "#/components/schemas/AssetAliasSchema",
+ $ref: "#/components/schemas/AssetAliasResponse",
},
type: "array",
title: "Aliases",
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 4bec27ae465..b6da4328d0e 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -7,6 +7,8 @@ import type {
NextRunAssetsResponse,
GetAssetsData,
GetAssetsResponse,
+ GetAssetAliasesData,
+ GetAssetAliasesResponse,
GetAssetEventsData,
GetAssetEventsResponse,
CreateAssetEventData,
@@ -209,6 +211,7 @@ export class AssetService {
* @param data The data for the request.
* @param data.limit
* @param data.offset
+ * @param data.namePattern
* @param data.uriPattern
* @param data.dagIds
* @param data.orderBy
@@ -224,6 +227,7 @@ export class AssetService {
query: {
limit: data.limit,
offset: data.offset,
+ name_pattern: data.namePattern,
uri_pattern: data.uriPattern,
dag_ids: data.dagIds,
order_by: data.orderBy,
@@ -237,6 +241,38 @@ export class AssetService {
});
}
+ /**
+ * Get Asset Aliases
+ * Get asset aliases.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.namePattern
+ * @param data.orderBy
+ * @returns AssetAliasCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getAssetAliases(
+ data: GetAssetAliasesData = {},
+ ): CancelablePromise<GetAssetAliasesResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/assets/aliases",
+ query: {
+ limit: data.limit,
+ offset: data.offset,
+ name_pattern: data.namePattern,
+ order_by: data.orderBy,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
/**
* Get Asset Events
* Get asset events.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 2831e7225f7..155224011cf 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -22,9 +22,17 @@ export type AppBuilderViewResponse = {
};
/**
- * Asset alias serializer for assets.
+ * Asset alias collection response.
*/
-export type AssetAliasSchema = {
+export type AssetAliasCollectionResponse = {
+ asset_aliases: Array<AssetAliasResponse>;
+ total_entries: number;
+};
+
+/**
+ * Asset alias serializer for responses.
+ */
+export type AssetAliasResponse = {
id: number;
name: string;
group: string;
@@ -79,7 +87,7 @@ export type AssetResponse = {
updated_at: string;
consuming_dags: Array<DagScheduleAssetReference>;
producing_tasks: Array<TaskOutletAssetReference>;
- aliases: Array<AssetAliasSchema>;
+ aliases: Array<AssetAliasResponse>;
};
/**
@@ -1302,6 +1310,7 @@ export type NextRunAssetsResponse = {
export type GetAssetsData = {
dagIds?: Array<string>;
limit?: number;
+ namePattern?: string | null;
offset?: number;
orderBy?: string;
uriPattern?: string | null;
@@ -1309,6 +1318,15 @@ export type GetAssetsData = {
export type GetAssetsResponse = AssetCollectionResponse;
+export type GetAssetAliasesData = {
+ limit?: number;
+ namePattern?: string | null;
+ offset?: number;
+ orderBy?: string;
+};
+
+export type GetAssetAliasesResponse = AssetAliasCollectionResponse;
+
export type GetAssetEventsData = {
assetId?: number | null;
limit?: number;
@@ -2095,6 +2113,33 @@ export type $OpenApiTs = {
};
};
};
+ "/public/assets/aliases": {
+ get: {
+ req: GetAssetAliasesData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: AssetAliasCollectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/assets/events": {
get: {
req: GetAssetEventsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 9218cbbf820..e773f2620b8 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -26,6 +26,7 @@ import time_machine
from airflow.models import DagModel
from airflow.models.asset import (
+ AssetAliasModel,
AssetDagRunQueue,
AssetEvent,
AssetModel,
@@ -86,6 +87,24 @@ def _create_provided_asset(session, asset: AssetModel) ->
None:
session.commit()
+def _create_asset_aliases(session, num: int = 2) -> None:
+ asset_aliases = [
+ AssetAliasModel(
+ id=i,
+ name=f"simple{i}",
+ group="alias",
+ )
+ for i in range(1, 1 + num)
+ ]
+ session.add_all(asset_aliases)
+ session.commit()
+
+
+def _create_provided_asset_alias(session, asset_alias: AssetAliasModel) ->
None:
+ session.add(asset_alias)
+ session.commit()
+
+
def _create_assets_events(session, num: int = 2) -> None:
assets_events = [
AssetEvent(
@@ -253,6 +272,42 @@ class TestGetAssets(TestAssets):
msg = "Ordering with 'fake' is disallowed or the attribute does not
exist on the model"
assert response.json()["detail"] == msg
+ @pytest.mark.parametrize(
+ "params, expected_assets",
+ [
+ ({"name_pattern": "s3"}, {"s3://folder/key"}),
+ ({"name_pattern": "bucket"}, {"gcp://bucket/key",
"wasb://some_asset_bucket_/key"}),
+ (
+ {"name_pattern": "asset"},
+ {"somescheme://asset/key", "wasb://some_asset_bucket_/key"},
+ ),
+ (
+ {"name_pattern": ""},
+ {
+ "gcp://bucket/key",
+ "s3://folder/key",
+ "somescheme://asset/key",
+ "wasb://some_asset_bucket_/key",
+ },
+ ),
+ ],
+ )
+ @provide_session
+ def test_filter_assets_by_name_pattern_works(self, test_client, params,
expected_assets, session):
+ asset1 = AssetModel("s3-folder-key", "s3://folder/key")
+ asset2 = AssetModel("gcp-bucket-key", "gcp://bucket/key")
+ asset3 = AssetModel("some-asset-key", "somescheme://asset/key")
+ asset4 = AssetModel("wasb-some_asset_bucket_-key",
"wasb://some_asset_bucket_/key")
+
+ assets = [asset1, asset2, asset3, asset4]
+ for a in assets:
+ self.create_provided_asset(asset=a)
+
+ response = test_client.get("/public/assets", params=params)
+ assert response.status_code == 200
+ asset_urls = {asset["uri"] for asset in response.json()["assets"]}
+ assert expected_assets == asset_urls
+
@pytest.mark.parametrize(
"params, expected_assets",
[
@@ -373,6 +428,105 @@ class TestGetAssetsEndpointPagination(TestAssets):
assert len(response.json()["assets"]) == 100
+class TestAssetAliases:
+ @pytest.fixture(autouse=True)
+ def setup(self) -> None:
+ clear_db_assets()
+ clear_db_runs()
+
+ def teardown_method(self) -> None:
+ clear_db_assets()
+ clear_db_runs()
+
+ @provide_session
+ def create_asset_aliases(self, num: int = 2, *, session):
+ _create_asset_aliases(num=num, session=session)
+
+ @provide_session
+ def create_provided_asset_alias(self, asset_alias: AssetAliasModel,
session):
+ _create_provided_asset_alias(session=session, asset_alias=asset_alias)
+
+
+class TestGetAssetAliases(TestAssetAliases):
+ def test_should_respond_200(self, test_client, session):
+ self.create_asset_aliases()
+ asset_aliases = session.query(AssetAliasModel).all()
+ assert len(asset_aliases) == 2
+
+ response = test_client.get("/public/assets/aliases")
+ assert response.status_code == 200
+ response_data = response.json()
+ assert response_data == {
+ "asset_aliases": [
+ {"id": 1, "name": "simple1", "group": "alias"},
+ {"id": 2, "name": "simple2", "group": "alias"},
+ ],
+ "total_entries": 2,
+ }
+
+ def test_order_by_raises_400_for_invalid_attr(self, test_client, session):
+ response = test_client.get("/public/assets/aliases?order_by=fake")
+
+ assert response.status_code == 400
+ msg = "Ordering with 'fake' is disallowed or the attribute does not
exist on the model"
+ assert response.json()["detail"] == msg
+
+ @pytest.mark.parametrize(
+ "params, expected_asset_aliases",
+ [
+ ({"name_pattern": "foo"}, {"foo1"}),
+ ({"name_pattern": "1"}, {"foo1", "bar12"}),
+ ({"uri_pattern": ""}, {"foo1", "bar12", "bar2", "bar3", "rex23"}),
+ ],
+ )
+ @provide_session
+ def test_filter_assets_by_name_pattern_works(self, test_client, params,
expected_asset_aliases, session):
+ asset_alias1 = AssetAliasModel(name="foo1")
+ asset_alias2 = AssetAliasModel(name="bar12")
+ asset_alias3 = AssetAliasModel(name="bar2")
+ asset_alias4 = AssetAliasModel(name="bar3")
+ asset_alias5 = AssetAliasModel(name="rex23")
+
+ asset_aliases = [asset_alias1, asset_alias2, asset_alias3,
asset_alias4, asset_alias5]
+ for a in asset_aliases:
+ self.create_provided_asset_alias(a)
+
+ response = test_client.get("/public/assets/aliases", params=params)
+ assert response.status_code == 200
+ alias_names = {asset_alias["name"] for asset_alias in
response.json()["asset_aliases"]}
+ assert expected_asset_aliases == alias_names
+
+
+class TestGetAssetAliasesEndpointPagination(TestAssetAliases):
+ @pytest.mark.parametrize(
+ "url, expected_asset_aliases",
+ [
+ # Limit test data
+ ("/public/assets/aliases?limit=1", ["simple1"]),
+ ("/public/assets/aliases?limit=100", [f"simple{i}" for i in
range(1, 101)]),
+ # Offset test data
+ ("/public/assets/aliases?offset=1", [f"simple{i}" for i in
range(2, 102)]),
+ ("/public/assets/aliases?offset=3", [f"simple{i}" for i in
range(4, 104)]),
+ # Limit and offset test data
+ ("/public/assets/aliases?offset=3&limit=3", ["simple4", "simple5",
"simple6"]),
+ ],
+ )
+ def test_limit_and_offset(self, test_client, url, expected_asset_aliases):
+ self.create_asset_aliases(num=110)
+
+ response = test_client.get(url)
+
+ assert response.status_code == 200
+ alias_names = [asset["name"] for asset in
response.json()["asset_aliases"]]
+ assert alias_names == expected_asset_aliases
+
+ def test_should_respect_page_size_limit_default(self, test_client):
+ self.create_asset_aliases(num=110)
+ response = test_client.get("/public/assets/aliases")
+ assert response.status_code == 200
+ assert len(response.json()["asset_aliases"]) == 100
+
+
class TestGetAssetEvents(TestAssets):
def test_should_respond_200(self, test_client, session):
self.create_assets()