This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b5f564663fe Revert "AIP-84: Migrating GET queued asset events for DAG
to fastAPI (#43934)" (#44088)
b5f564663fe is described below
commit b5f564663fe7fcf963ff3cb5c3c6dc89688bf121
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Nov 16 08:30:50 2024 +0100
Revert "AIP-84: Migrating GET queued asset events for DAG to fastAPI
(#43934)" (#44088)
This reverts commit 391773018747b37cfa5dab0dcfc8b22d763e38e6.
---
airflow/api_connexion/endpoints/asset_endpoint.py | 1 -
airflow/api_fastapi/common/parameters.py | 26 +-----
airflow/api_fastapi/core_api/datamodels/assets.py | 15 ---
.../api_fastapi/core_api/openapi/v1-generated.yaml | 102 +--------------------
.../api_fastapi/core_api/routes/public/assets.py | 79 +---------------
airflow/ui/openapi-gen/queries/common.ts | 22 -----
airflow/ui/openapi-gen/queries/prefetch.ts | 26 ------
airflow/ui/openapi-gen/queries/queries.ts | 33 -------
airflow/ui/openapi-gen/queries/suspense.ts | 33 -------
airflow/ui/openapi-gen/requests/schemas.gen.ts | 42 ---------
airflow/ui/openapi-gen/requests/services.gen.ts | 36 +-------
airflow/ui/openapi-gen/requests/types.gen.ts | 55 +----------
.../core_api/routes/public/test_assets.py | 54 +----------
13 files changed, 17 insertions(+), 507 deletions(-)
diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py
b/airflow/api_connexion/endpoints/asset_endpoint.py
index ff47db88387..7915bf8b034 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -222,7 +222,6 @@ def delete_dag_asset_queued_event(
)
-@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
diff --git a/airflow/api_fastapi/common/parameters.py
b/airflow/api_fastapi/common/parameters.py
index c573996eafd..337d85547c3 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime
-from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List,
Optional, TypeVar, Union, overload
+from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List,
Optional, TypeVar
from fastapi import Depends, HTTPException, Query
from pendulum.parsing.exceptions import ParserError
@@ -409,27 +409,6 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
if not date_to_check:
raise ValueError(f"{date_to_check} cannot be None.")
- return _safe_parse_datetime_optional(date_to_check)
-
-
-@overload
-def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ...
-
-
-@overload
-def _safe_parse_datetime_optional(date_to_check: None) -> None: ...
-
-
-def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime |
None:
- """
- Parse datetime and raise error for invalid dates.
-
- Allow None values.
-
- :param date_to_check: the string value to be parsed
- """
- if date_to_check is None:
- return None
try:
return timezone.parse(date_to_check, strict=True)
except (TypeError, ParserError):
@@ -635,8 +614,7 @@ def float_range_filter_factory(
# Common Safe DateTime
-DateTimeQuery = Annotated[datetime, AfterValidator(_safe_parse_datetime)]
-OptionalDateTimeQuery = Annotated[Union[datetime, None],
AfterValidator(_safe_parse_datetime_optional)]
+DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
# DAG
QueryLimit = Annotated[LimitFilter, Depends(LimitFilter().depends)]
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow/api_fastapi/core_api/datamodels/assets.py
index bfdbb2d7fc8..e5ac10715ed 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -101,21 +101,6 @@ class AssetEventCollectionResponse(BaseModel):
total_entries: int
-class QueuedEventResponse(BaseModel):
- """Queued Event serializer for responses.."""
-
- uri: str
- dag_id: str
- created_at: datetime
-
-
-class QueuedEventCollectionResponse(BaseModel):
- """Queued Event Collection serializer for responses."""
-
- queued_events: list[QueuedEventResponse]
- total_entries: int
-
-
class CreateAssetEventsBody(BaseModel):
"""Create asset events request."""
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index bdf1b8aef1b..e7762392a0c 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -47,14 +47,12 @@ paths:
required: true
schema:
type: string
- format: date-time
title: Start Date
- name: end_date
in: query
required: true
schema:
type: string
- format: date-time
title: End Date
responses:
'200':
@@ -172,7 +170,7 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /public/assets:
+ /public/assets/:
get:
tags:
- Asset
@@ -348,7 +346,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /public/events:
post:
tags:
- Asset
@@ -356,11 +353,11 @@ paths:
description: Create asset events.
operationId: create_asset_event
requestBody:
+ required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateAssetEventsBody'
- required: true
responses:
'200':
description: Successful Response
@@ -369,23 +366,23 @@ paths:
schema:
$ref: '#/components/schemas/AssetEventResponse'
'401':
- description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
'403':
- description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
'404':
- description: Not Found
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
'422':
description: Validation Error
content:
@@ -437,60 +434,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /public/dags/{dag_id}/assets/queuedEvent:
- get:
- tags:
- - Asset
- summary: Get Dag Asset Queued Events
- description: Get queued asset events for a DAG.
- operationId: get_dag_asset_queued_events
- parameters:
- - name: dag_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Id
- - name: before
- in: query
- required: false
- schema:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Before
- responses:
- '200':
- description: Successful Response
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/QueuedEventCollectionResponse'
- '401':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Unauthorized
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '404':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Found
- '422':
- description: Validation Error
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
@@ -5787,41 +5730,6 @@ components:
- version
title: ProviderResponse
description: Provider serializer for responses.
- QueuedEventCollectionResponse:
- properties:
- queued_events:
- items:
- $ref: '#/components/schemas/QueuedEventResponse'
- type: array
- title: Queued Events
- total_entries:
- type: integer
- title: Total Entries
- type: object
- required:
- - queued_events
- - total_entries
- title: QueuedEventCollectionResponse
- description: Queued Event Collection serializer for responses.
- QueuedEventResponse:
- properties:
- uri:
- type: string
- title: Uri
- dag_id:
- type: string
- title: Dag Id
- created_at:
- type: string
- format: date-time
- title: Created At
- type: object
- required:
- - uri
- - dag_id
- - created_at
- title: QueuedEventResponse
- description: Queued Event serializer for responses..
ReprocessBehavior:
type: string
enum:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 0900b040098..326a387f008 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -17,7 +17,6 @@
from __future__ import annotations
-from datetime import datetime
from typing import Annotated
from fastapi import Depends, HTTPException, status
@@ -26,7 +25,6 @@ from sqlalchemy.orm import Session, joinedload, subqueryload
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
- OptionalDateTimeQuery,
QueryAssetDagIdPatternSearch,
QueryAssetIdFilter,
QueryLimit,
@@ -45,41 +43,18 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
- QueuedEventCollectionResponse,
- QueuedEventResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
-from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
+from airflow.models.asset import AssetEvent, AssetModel
from airflow.utils import timezone
-assets_router = AirflowRouter(tags=["Asset"])
-
-
-def _generate_queued_event_where_clause(
- *,
- dag_id: str | None = None,
- uri: str | None = None,
- before: datetime | None = None,
-) -> list:
- """Get AssetDagRunQueue where clause."""
- where_clause = []
- if dag_id is not None:
- where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
- if uri is not None:
- where_clause.append(
- AssetDagRunQueue.asset_id.in_(
- select(AssetModel.id).where(AssetModel.uri == uri),
- ),
- )
- if before is not None:
- where_clause.append(AssetDagRunQueue.created_at < before)
- return where_clause
+assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")
@assets_router.get(
- "/assets",
+ "/",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_assets(
@@ -114,7 +89,7 @@ def get_assets(
@assets_router.get(
- "/assets/events",
+ "/events",
responses=create_openapi_http_exception_doc([404]),
)
def get_asset_events(
@@ -190,7 +165,7 @@ def create_asset_event(
@assets_router.get(
- "/assets/{uri:path}",
+ "/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_asset(
@@ -208,47 +183,3 @@ def get_asset(
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri:
`{uri}` was not found")
return AssetResponse.model_validate(asset, from_attributes=True)
-
-
-@assets_router.get(
- "/dags/{dag_id}/assets/queuedEvent",
- responses=create_openapi_http_exception_doc(
- [
- status.HTTP_404_NOT_FOUND,
- ]
- ),
-)
-def get_dag_asset_queued_events(
- dag_id: str,
- session: Annotated[Session, Depends(get_session)],
- before: OptionalDateTimeQuery = None,
-) -> QueuedEventCollectionResponse:
- """Get queued asset events for a DAG."""
- where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
before=before)
- query = (
- select(AssetDagRunQueue, AssetModel.uri)
- .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
- .where(*where_clause)
- )
-
- dag_asset_queued_events_select, total_entries = paginated_select(
- query,
- [],
- )
- adrqs = session.execute(dag_asset_queued_events_select).all()
-
- if not adrqs:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with
dag_id: `{dag_id}` was not found")
-
- queued_events = [
- QueuedEventResponse(created_at=adrq.created_at,
dag_id=adrq.target_dag_id, uri=uri)
- for adrq, uri in adrqs
- ]
-
- return QueuedEventCollectionResponse(
- queued_events=[
- QueuedEventResponse.model_validate(queued_event,
from_attributes=True)
- for queued_event in queued_events
- ],
- total_entries=total_entries,
- )
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 7b23e33f0ab..46940bfd318 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -129,28 +129,6 @@ export const UseAssetServiceGetAssetKeyFn = (
},
queryKey?: Array<unknown>,
) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])];
-export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited<
- ReturnType<typeof AssetService.getDagAssetQueuedEvents>
->;
-export type AssetServiceGetDagAssetQueuedEventsQueryResult<
- TData = AssetServiceGetDagAssetQueuedEventsDefaultResponse,
- TError = unknown,
-> = UseQueryResult<TData, TError>;
-export const useAssetServiceGetDagAssetQueuedEventsKey =
- "AssetServiceGetDagAssetQueuedEvents";
-export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = (
- {
- before,
- dagId,
- }: {
- before?: string;
- dagId: string;
- },
- queryKey?: Array<unknown>,
-) => [
- useAssetServiceGetDagAssetQueuedEventsKey,
- ...(queryKey ?? [{ before, dagId }]),
-];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
ReturnType<typeof DashboardService.historicalMetrics>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 0c522f36e43..4c541670258 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -165,32 +165,6 @@ export const prefetchUseAssetServiceGetAsset = (
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }),
queryFn: () => AssetService.getAsset({ uri }),
});
-/**
- * Get Dag Asset Queued Events
- * Get queued asset events for a DAG.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.before
- * @returns QueuedEventCollectionResponse Successful Response
- * @throws ApiError
- */
-export const prefetchUseAssetServiceGetDagAssetQueuedEvents = (
- queryClient: QueryClient,
- {
- before,
- dagId,
- }: {
- before?: string;
- dagId: string;
- },
-) =>
- queryClient.prefetchQuery({
- queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn({
- before,
- dagId,
- }),
- queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }),
- });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 8ec0ea9234a..a96b09e12a7 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -213,39 +213,6 @@ export const useAssetServiceGetAsset = <
queryFn: () => AssetService.getAsset({ uri }) as TData,
...options,
});
-/**
- * Get Dag Asset Queued Events
- * Get queued asset events for a DAG.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.before
- * @returns QueuedEventCollectionResponse Successful Response
- * @throws ApiError
- */
-export const useAssetServiceGetDagAssetQueuedEvents = <
- TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse,
- TError = unknown,
- TQueryKey extends Array<unknown> = unknown[],
->(
- {
- before,
- dagId,
- }: {
- before?: string;
- dagId: string;
- },
- queryKey?: TQueryKey,
- options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
- useQuery<TData, TError>({
- queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn(
- { before, dagId },
- queryKey,
- ),
- queryFn: () =>
- AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
- ...options,
- });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 1b814222815..43331b187fe 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -195,39 +195,6 @@ export const useAssetServiceGetAssetSuspense = <
queryFn: () => AssetService.getAsset({ uri }) as TData,
...options,
});
-/**
- * Get Dag Asset Queued Events
- * Get queued asset events for a DAG.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.before
- * @returns QueuedEventCollectionResponse Successful Response
- * @throws ApiError
- */
-export const useAssetServiceGetDagAssetQueuedEventsSuspense = <
- TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse,
- TError = unknown,
- TQueryKey extends Array<unknown> = unknown[],
->(
- {
- before,
- dagId,
- }: {
- before?: string;
- dagId: string;
- },
- queryKey?: TQueryKey,
- options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
- useSuspenseQuery<TData, TError>({
- queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn(
- { before, dagId },
- queryKey,
- ),
- queryFn: () =>
- AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
- ...options,
- });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 1f83e434286..e5ac0441a2a 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2880,48 +2880,6 @@ export const $ProviderResponse = {
description: "Provider serializer for responses.",
} as const;
-export const $QueuedEventCollectionResponse = {
- properties: {
- queued_events: {
- items: {
- $ref: "#/components/schemas/QueuedEventResponse",
- },
- type: "array",
- title: "Queued Events",
- },
- total_entries: {
- type: "integer",
- title: "Total Entries",
- },
- },
- type: "object",
- required: ["queued_events", "total_entries"],
- title: "QueuedEventCollectionResponse",
- description: "Queued Event Collection serializer for responses.",
-} as const;
-
-export const $QueuedEventResponse = {
- properties: {
- uri: {
- type: "string",
- title: "Uri",
- },
- dag_id: {
- type: "string",
- title: "Dag Id",
- },
- created_at: {
- type: "string",
- format: "date-time",
- title: "Created At",
- },
- },
- type: "object",
- required: ["uri", "dag_id", "created_at"],
- title: "QueuedEventResponse",
- description: "Queued Event serializer for responses..",
-} as const;
-
export const $ReprocessBehavior = {
type: "string",
enum: ["failed", "completed", "none"],
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index c39dce38d34..53bb3527d14 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -13,8 +13,6 @@ import type {
CreateAssetEventResponse,
GetAssetData,
GetAssetResponse,
- GetDagAssetQueuedEventsData,
- GetDagAssetQueuedEventsResponse,
HistoricalMetricsData,
HistoricalMetricsResponse,
RecentDagRunsData,
@@ -169,7 +167,7 @@ export class AssetService {
): CancelablePromise<GetAssetsResponse> {
return __request(OpenAPI, {
method: "GET",
- url: "/public/assets",
+ url: "/public/assets/",
query: {
limit: data.limit,
offset: data.offset,
@@ -239,7 +237,7 @@ export class AssetService {
): CancelablePromise<CreateAssetEventResponse> {
return __request(OpenAPI, {
method: "POST",
- url: "/public/events",
+ url: "/public/assets/events",
body: data.requestBody,
mediaType: "application/json",
errors: {
@@ -276,36 +274,6 @@ export class AssetService {
},
});
}
-
- /**
- * Get Dag Asset Queued Events
- * Get queued asset events for a DAG.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.before
- * @returns QueuedEventCollectionResponse Successful Response
- * @throws ApiError
- */
- public static getDagAssetQueuedEvents(
- data: GetDagAssetQueuedEventsData,
- ): CancelablePromise<GetDagAssetQueuedEventsResponse> {
- return __request(OpenAPI, {
- method: "GET",
- url: "/public/dags/{dag_id}/assets/queuedEvent",
- path: {
- dag_id: data.dagId,
- },
- query: {
- before: data.before,
- },
- errors: {
- 401: "Unauthorized",
- 403: "Forbidden",
- 404: "Not Found",
- 422: "Validation Error",
- },
- });
- }
}
export class DashboardService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 96d6b812897..078699cc0f2 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -712,23 +712,6 @@ export type ProviderResponse = {
version: string;
};
-/**
- * Queued Event Collection serializer for responses.
- */
-export type QueuedEventCollectionResponse = {
- queued_events: Array<QueuedEventResponse>;
- total_entries: number;
-};
-
-/**
- * Queued Event serializer for responses..
- */
-export type QueuedEventResponse = {
- uri: string;
- dag_id: string;
- created_at: string;
-};
-
/**
* Internal enum for setting reprocess behavior in a backfill.
*
@@ -1062,13 +1045,6 @@ export type GetAssetData = {
export type GetAssetResponse = AssetResponse;
-export type GetDagAssetQueuedEventsData = {
- before?: string | null;
- dagId: string;
-};
-
-export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse;
-
export type HistoricalMetricsData = {
endDate: string;
startDate: string;
@@ -1559,7 +1535,7 @@ export type $OpenApiTs = {
};
};
};
- "/public/assets": {
+ "/public/assets/": {
get: {
req: GetAssetsData;
res: {
@@ -1612,8 +1588,6 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
- };
- "/public/events": {
post: {
req: CreateAssetEventData;
res: {
@@ -1667,33 +1641,6 @@ export type $OpenApiTs = {
};
};
};
- "/public/dags/{dag_id}/assets/queuedEvent": {
- get: {
- req: GetDagAssetQueuedEventsData;
- res: {
- /**
- * Successful Response
- */
- 200: QueuedEventCollectionResponse;
- /**
- * Unauthorized
- */
- 401: HTTPExceptionResponse;
- /**
- * Forbidden
- */
- 403: HTTPExceptionResponse;
- /**
- * Not Found
- */
- 404: HTTPExceptionResponse;
- /**
- * Validation Error
- */
- 422: HTTPValidationError;
- };
- };
- };
"/ui/dashboard/historical_metrics_data": {
get: {
req: HistoricalMetricsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index b5744e47edf..42b7acd908f 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -24,13 +24,7 @@ import pytest
import time_machine
from airflow.models import DagModel
-from airflow.models.asset import (
- AssetDagRunQueue,
- AssetEvent,
- AssetModel,
- DagScheduleAssetReference,
- TaskOutletAssetReference,
-)
+from airflow.models.asset import AssetEvent, AssetModel,
DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dagrun import DagRun
from airflow.utils import timezone
from airflow.utils.session import provide_session
@@ -470,7 +464,7 @@ class TestGetAssetEndpoint(TestAssets):
assert response.json()["detail"] == "The Asset with uri:
`s3://bucket/key` was not found"
-class TestQueuedEventEndpoint(TestAssets):
+class TestPostAssetEvents(TestAssets):
@pytest.fixture
def time_freezer(self) -> Generator:
freezer = time_machine.travel(self.default_time, tick=False)
@@ -480,50 +474,6 @@ class TestQueuedEventEndpoint(TestAssets):
freezer.stop()
- def _create_asset_dag_run_queues(self, dag_id, asset_id, session):
- adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id)
- session.add(adrq)
- session.commit()
- return adrq
-
-
-class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint):
- @pytest.mark.usefixtures("time_freezer")
- def test_should_respond_200(self, test_client, session, create_dummy_dag):
- dag, _ = create_dummy_dag()
- dag_id = dag.dag_id
- self.create_assets(session=session, num=1)
- asset_id = 1
- self._create_asset_dag_run_queues(dag_id, asset_id, session)
-
- response = test_client.get(
- f"/public/dags/{dag_id}/assets/queuedEvent",
- )
-
- assert response.status_code == 200
- assert response.json() == {
- "queued_events": [
- {
- "created_at": self.default_time.replace("+00:00", "Z"),
- "uri": "s3://bucket/key/1",
- "dag_id": "dag",
- }
- ],
- "total_entries": 1,
- }
-
- def test_should_respond_404(self, test_client):
- dag_id = "not_exists"
-
- response = test_client.get(
- f"/public/dags/{dag_id}/assets/queuedEvent",
- )
-
- assert response.status_code == 404
- assert response.json()["detail"] == "Queue event with dag_id:
`not_exists` was not found"
-
-
-class TestPostAssetEvents(TestAssets):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, test_client, session):
self.create_assets()