This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 69c1f92eae Migrate the public endpoint Get DAG Stats to FastAPI
(#43255)
69c1f92eae is described below
commit 69c1f92eae21ad0516c96298a210597269fddab9
Author: Omkar P <[email protected]>
AuthorDate: Tue Nov 5 21:28:11 2024 +0530
Migrate the public endpoint Get DAG Stats to FastAPI (#43255)
* Migrate public endpoint Get DAG Stats to FastAPI, with main resynced
* Re-run static checks
* Add newlines to separate entities
---
.../api_connexion/endpoints/dag_stats_endpoint.py | 2 +
airflow/api_fastapi/common/db/common.py | 5 +-
airflow/api_fastapi/common/db/dag_runs.py | 32 ++
airflow/api_fastapi/common/parameters.py | 17 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 98 +++++
.../api_fastapi/core_api/routes/public/__init__.py | 2 +
.../core_api/routes/public/dag_stats.py | 79 ++++
.../api_fastapi/core_api/serializers/dag_stats.py | 43 +++
airflow/ui/openapi-gen/queries/common.ts | 17 +
airflow/ui/openapi-gen/queries/prefetch.ts | 21 ++
airflow/ui/openapi-gen/queries/queries.ts | 27 ++
airflow/ui/openapi-gen/queries/suspense.ts | 27 ++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 56 +++
airflow/ui/openapi-gen/requests/services.gen.ts | 31 ++
airflow/ui/openapi-gen/requests/types.gen.ts | 61 +++
.../core_api/routes/public/test_dag_stats.py | 416 +++++++++++++++++++++
16 files changed, 933 insertions(+), 1 deletion(-)
diff --git a/airflow/api_connexion/endpoints/dag_stats_endpoint.py
b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
index 3b6c6ab8e0..c4d8701f8d 100644
--- a/airflow/api_connexion/endpoints/dag_stats_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
@@ -27,6 +27,7 @@ from airflow.api_connexion.schemas.dag_stats_schema import (
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models.dag import DagRun
+from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
from airflow.www.extensions.init_auth_manager import get_auth_manager
@@ -37,6 +38,7 @@ if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse
+@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_stats(
diff --git a/airflow/api_fastapi/common/db/common.py
b/airflow/api_fastapi/common/db/common.py
index 3feb3ba599..01e1fe532b 100644
--- a/airflow/api_fastapi/common/db/common.py
+++ b/airflow/api_fastapi/common/db/common.py
@@ -65,13 +65,16 @@ def paginated_select(
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: Session = NEW_SESSION,
+ return_total_entries: bool = True,
) -> Select:
base_select = apply_filters_to_select(
base_select,
filters,
)
- total_entries = get_query_count(base_select, session=session)
+ total_entries = None
+ if return_total_entries:
+ total_entries = get_query_count(base_select, session=session)
# TODO: Re-enable when permissions are handled. Readable / writable
entities,
# for instance:
diff --git a/airflow/api_fastapi/common/db/dag_runs.py
b/airflow/api_fastapi/common/db/dag_runs.py
new file mode 100644
index 0000000000..8f4b02a067
--- /dev/null
+++ b/airflow/api_fastapi/common/db/dag_runs.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from sqlalchemy import func, select
+
+from airflow.models.dagrun import DagRun
+
+dagruns_select_with_state_count = (
+ select(
+ DagRun.dag_id,
+ DagRun.state,
+ func.count(DagRun.state),
+ )
+ .group_by(DagRun.dag_id, DagRun.state)
+ .order_by(DagRun.dag_id)
+)
diff --git a/airflow/api_fastapi/common/parameters.py
b/airflow/api_fastapi/common/parameters.py
index 218077ca59..64ae9406f0 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -112,6 +112,18 @@ class _OnlyActiveFilter(BaseParam[bool]):
return self.set_value(only_active)
+class _DagIdsFilter(BaseParam[list[str]]):
+ """Filter on multi-valued dag_ids param for DagRun."""
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value and self.skip_none:
+ return select.where(DagRun.dag_id.in_(self.value))
+ return select
+
+ def depends(self, dag_ids: list[str] = Query(None)) -> _DagIdsFilter:
+ return self.set_value(dag_ids)
+
+
class _SearchParam(BaseParam[str]):
"""Search on attribute."""
@@ -325,6 +337,7 @@ class _DagIdFilter(BaseParam[str]):
# Common Safe DateTime
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
+
# DAG
QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter().depends)]
QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter().depends)]
@@ -339,10 +352,14 @@ QueryDagIdPatternSearchWithNone = Annotated[
]
QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter().depends)]
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)]
+
# DagRun
QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter,
Depends(_LastDagRunStateFilter().depends)]
+QueryDagIdsFilter = Annotated[_DagIdsFilter, Depends(_DagIdsFilter().depends)]
+
# DAGWarning
QueryDagIdInDagWarningFilter = Annotated[_DagIdFilter,
Depends(_DagIdFilter(DagWarning.dag_id).depends)]
QueryWarningTypeFilter = Annotated[_WarningTypeFilter,
Depends(_WarningTypeFilter().depends)]
+
# DAGTags
QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch,
Depends(_DagTagNamePatternSearch().depends)]
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 06a041b55d..5f60903947 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -2462,6 +2462,59 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/VersionInfo'
+ /public/dagStats/:
+ get:
+ tags:
+ - DagStats
+ summary: Get Dag Stats
+ description: Get Dag statistics.
+ operationId: get_dag_stats
+ parameters:
+ - name: dag_ids
+ in: query
+ required: false
+ schema:
+ type: array
+ items:
+ type: string
+ title: Dag Ids
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DagStatsCollectionResponse'
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
AppBuilderMenuItemResponse:
@@ -3467,6 +3520,51 @@ components:
- asset_triggered
title: DagRunType
description: Class with DagRun types.
+ DagStatsCollectionResponse:
+ properties:
+ dags:
+ items:
+ $ref: '#/components/schemas/DagStatsResponse'
+ type: array
+ title: Dags
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - dags
+ - total_entries
+ title: DagStatsCollectionResponse
+ description: DAG Stats Collection serializer for responses.
+ DagStatsResponse:
+ properties:
+ dag_id:
+ type: string
+ title: Dag Id
+ stats:
+ items:
+ $ref: '#/components/schemas/DagStatsStateResponse'
+ type: array
+ title: Stats
+ type: object
+ required:
+ - dag_id
+ - stats
+ title: DagStatsResponse
+ description: DAG Stats serializer for responses.
+ DagStatsStateResponse:
+ properties:
+ state:
+ $ref: '#/components/schemas/DagRunState'
+ count:
+ type: integer
+ title: Count
+ type: object
+ required:
+ - state
+ - count
+ title: DagStatsStateResponse
+ description: DagStatsState serializer for responses.
DagTagPydantic:
properties:
name:
diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py
b/airflow/api_fastapi/core_api/routes/public/__init__.py
index 68caa2d775..b7c8affe4a 100644
--- a/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -22,6 +22,7 @@ from airflow.api_fastapi.core_api.routes.public.backfills
import backfills_route
from airflow.api_fastapi.core_api.routes.public.connections import
connections_router
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_sources import
dag_sources_router
+from airflow.api_fastapi.core_api.routes.public.dag_stats import
dag_stats_router
from airflow.api_fastapi.core_api.routes.public.dag_warning import
dag_warning_router
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import
event_logs_router
@@ -54,3 +55,4 @@ public_router.include_router(task_instances_router)
public_router.include_router(variables_router)
public_router.include_router(variables_router)
public_router.include_router(version_router)
+public_router.include_router(dag_stats_router)
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_stats.py
b/airflow/api_fastapi/core_api/routes/public/dag_stats.py
new file mode 100644
index 0000000000..deed40f011
--- /dev/null
+++ b/airflow/api_fastapi/core_api/routes/public/dag_stats.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from fastapi import Depends
+from sqlalchemy.orm import Session
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.common.db.common import (
+ get_session,
+ paginated_select,
+)
+from airflow.api_fastapi.common.db.dag_runs import
dagruns_select_with_state_count
+from airflow.api_fastapi.common.parameters import QueryDagIdsFilter
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.serializers.dag_stats import (
+ DagStatsCollectionResponse,
+ DagStatsResponse,
+ DagStatsStateResponse,
+)
+from airflow.utils.state import DagRunState
+
+dag_stats_router = AirflowRouter(tags=["DagStats"], prefix="/dagStats")
+
+
+@dag_stats_router.get(
+ "/",
+ responses=create_openapi_http_exception_doc([400, 401, 403, 404]),
+)
+async def get_dag_stats(
+ session: Annotated[Session, Depends(get_session)],
+ dag_ids: QueryDagIdsFilter,
+) -> DagStatsCollectionResponse:
+ """Get Dag statistics."""
+ dagruns_select, _ = paginated_select(
+ base_select=dagruns_select_with_state_count,
+ filters=[dag_ids],
+ session=session,
+ return_total_entries=False,
+ )
+ query_result = session.execute(dagruns_select)
+
+ result_dag_ids = []
+ dag_state_data = {}
+ for dag_id, state, count in query_result:
+ dag_state_data[(dag_id, state)] = count
+ if dag_id not in result_dag_ids:
+ result_dag_ids.append(dag_id)
+
+ dags = [
+ DagStatsResponse(
+ dag_id=dag_id,
+ stats=[
+ DagStatsStateResponse(
+ state=state,
+ count=dag_state_data.get((dag_id, state), 0),
+ )
+ for state in DagRunState
+ ],
+ )
+ for dag_id in result_dag_ids
+ ]
+ return DagStatsCollectionResponse(dags=dags, total_entries=len(dags))
diff --git a/airflow/api_fastapi/core_api/serializers/dag_stats.py
b/airflow/api_fastapi/core_api/serializers/dag_stats.py
new file mode 100644
index 0000000000..0d768c2cba
--- /dev/null
+++ b/airflow/api_fastapi/core_api/serializers/dag_stats.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+from airflow.utils.state import DagRunState
+
+
+class DagStatsStateResponse(BaseModel):
+ """DagStatsState serializer for responses."""
+
+ state: DagRunState
+ count: int
+
+
+class DagStatsResponse(BaseModel):
+ """DAG Stats serializer for responses."""
+
+ dag_id: str
+ stats: list[DagStatsStateResponse]
+
+
+class DagStatsCollectionResponse(BaseModel):
+ """DAG Stats Collection serializer for responses."""
+
+ dags: list[DagStatsResponse]
+ total_entries: int
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 2ed842201c..cec1f0f314 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -8,6 +8,7 @@ import {
DagRunService,
DagService,
DagSourceService,
+ DagStatsService,
DagWarningService,
DagsService,
DashboardService,
@@ -679,6 +680,22 @@ export const UseVersionServiceGetVersionKeyFn =
(queryKey?: Array<unknown>) => [
useVersionServiceGetVersionKey,
...(queryKey ?? []),
];
+export type DagStatsServiceGetDagStatsDefaultResponse = Awaited<
+ ReturnType<typeof DagStatsService.getDagStats>
+>;
+export type DagStatsServiceGetDagStatsQueryResult<
+ TData = DagStatsServiceGetDagStatsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagStatsServiceGetDagStatsKey = "DagStatsServiceGetDagStats";
+export const UseDagStatsServiceGetDagStatsKeyFn = (
+ {
+ dagIds,
+ }: {
+ dagIds?: string[];
+ } = {},
+ queryKey?: Array<unknown>,
+) => [useDagStatsServiceGetDagStatsKey, ...(queryKey ?? [{ dagIds }])];
export type BackfillServiceCreateBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.createBackfill>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 6c41b7a5a1..04443427ac 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -8,6 +8,7 @@ import {
DagRunService,
DagService,
DagSourceService,
+ DagStatsService,
DagWarningService,
DagsService,
DashboardService,
@@ -875,3 +876,23 @@ export const prefetchUseVersionServiceGetVersion =
(queryClient: QueryClient) =>
queryKey: Common.UseVersionServiceGetVersionKeyFn(),
queryFn: () => VersionService.getVersion(),
});
+/**
+ * Get Dag Stats
+ * Get Dag statistics.
+ * @param data The data for the request.
+ * @param data.dagIds
+ * @returns DagStatsCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagStatsServiceGetDagStats = (
+ queryClient: QueryClient,
+ {
+ dagIds,
+ }: {
+ dagIds?: string[];
+ } = {},
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }),
+ queryFn: () => DagStatsService.getDagStats({ dagIds }),
+ });
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 583f14f771..11dea6f3df 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -13,6 +13,7 @@ import {
DagRunService,
DagService,
DagSourceService,
+ DagStatsService,
DagWarningService,
DagsService,
DashboardService,
@@ -1093,6 +1094,32 @@ export const useVersionServiceGetVersion = <
queryFn: () => VersionService.getVersion() as TData,
...options,
});
+/**
+ * Get Dag Stats
+ * Get Dag statistics.
+ * @param data The data for the request.
+ * @param data.dagIds
+ * @returns DagStatsCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagStatsServiceGetDagStats = <
+ TData = Common.DagStatsServiceGetDagStatsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagIds,
+ }: {
+ dagIds?: string[];
+ } = {},
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }, queryKey),
+ queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData,
+ ...options,
+ });
/**
* Create Backfill
* @param data The data for the request.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 2870605672..eed1a0afe8 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -8,6 +8,7 @@ import {
DagRunService,
DagService,
DagSourceService,
+ DagStatsService,
DagWarningService,
DagsService,
DashboardService,
@@ -1078,3 +1079,29 @@ export const useVersionServiceGetVersionSuspense = <
queryFn: () => VersionService.getVersion() as TData,
...options,
});
+/**
+ * Get Dag Stats
+ * Get Dag statistics.
+ * @param data The data for the request.
+ * @param data.dagIds
+ * @returns DagStatsCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagStatsServiceGetDagStatsSuspense = <
+ TData = Common.DagStatsServiceGetDagStatsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagIds,
+ }: {
+ dagIds?: string[];
+ } = {},
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }, queryKey),
+ queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData,
+ ...options,
+ });
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index c1dc8cd345..d64abb3853 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1580,6 +1580,62 @@ export const $DagRunType = {
description: "Class with DagRun types.",
} as const;
+export const $DagStatsCollectionResponse = {
+ properties: {
+ dags: {
+ items: {
+ $ref: "#/components/schemas/DagStatsResponse",
+ },
+ type: "array",
+ title: "Dags",
+ },
+ total_entries: {
+ type: "integer",
+ title: "Total Entries",
+ },
+ },
+ type: "object",
+ required: ["dags", "total_entries"],
+ title: "DagStatsCollectionResponse",
+ description: "DAG Stats Collection serializer for responses.",
+} as const;
+
+export const $DagStatsResponse = {
+ properties: {
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ stats: {
+ items: {
+ $ref: "#/components/schemas/DagStatsStateResponse",
+ },
+ type: "array",
+ title: "Stats",
+ },
+ },
+ type: "object",
+ required: ["dag_id", "stats"],
+ title: "DagStatsResponse",
+ description: "DAG Stats serializer for responses.",
+} as const;
+
+export const $DagStatsStateResponse = {
+ properties: {
+ state: {
+ $ref: "#/components/schemas/DagRunState",
+ },
+ count: {
+ type: "integer",
+ title: "Count",
+ },
+ },
+ type: "object",
+ required: ["state", "count"],
+ title: "DagStatsStateResponse",
+ description: "DagStatsState serializer for responses.",
+} as const;
+
export const $DagTagPydantic = {
properties: {
name: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 4eecb848a5..915636fbb5 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -91,6 +91,8 @@ import type {
PostVariableData,
PostVariableResponse,
GetVersionResponse,
+ GetDagStatsData,
+ GetDagStatsResponse,
} from "./types.gen";
export class AssetService {
@@ -1415,3 +1417,32 @@ export class VersionService {
});
}
}
+
+export class DagStatsService {
+ /**
+ * Get Dag Stats
+ * Get Dag statistics.
+ * @param data The data for the request.
+ * @param data.dagIds
+ * @returns DagStatsCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getDagStats(
+ data: GetDagStatsData = {},
+ ): CancelablePromise<GetDagStatsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dagStats/",
+ query: {
+ dag_ids: data.dagIds,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+}
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 603a20d090..f96acacf61 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -347,6 +347,30 @@ export type DagRunType =
| "manual"
| "asset_triggered";
+/**
+ * DAG Stats Collection serializer for responses.
+ */
+export type DagStatsCollectionResponse = {
+ dags: Array<DagStatsResponse>;
+ total_entries: number;
+};
+
+/**
+ * DAG Stats serializer for responses.
+ */
+export type DagStatsResponse = {
+ dag_id: string;
+ stats: Array<DagStatsStateResponse>;
+};
+
+/**
+ * DagStatsState serializer for responses.
+ */
+export type DagStatsStateResponse = {
+ state: DagRunState;
+ count: number;
+};
+
/**
* Serializable representation of the DagTag ORM SqlAlchemyModel used by
internal API.
*/
@@ -1045,6 +1069,12 @@ export type PostVariableResponse = VariableResponse;
export type GetVersionResponse = VersionInfo;
+export type GetDagStatsData = {
+ dagIds?: Array<string>;
+};
+
+export type GetDagStatsResponse = DagStatsCollectionResponse;
+
export type $OpenApiTs = {
"/ui/next_run_assets/{dag_id}": {
get: {
@@ -2165,4 +2195,35 @@ export type $OpenApiTs = {
};
};
};
+ "/public/dagStats/": {
+ get: {
+ req: GetDagStatsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: DagStatsCollectionResponse;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
};
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_stats.py
b/tests/api_fastapi/core_api/routes/public/test_dag_stats.py
new file mode 100644
index 0000000000..e2611addd7
--- /dev/null
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_stats.py
@@ -0,0 +1,416 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow.models.dag import DagModel
+from airflow.models.dagrun import DagRun
+from airflow.utils import timezone
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunType
+
+from tests_common.test_utils.db import clear_db_dags, clear_db_runs,
clear_db_serialized_dags
+
+pytestmark = pytest.mark.db_test
+
+DAG1_ID = "test_dag1"
+DAG2_ID = "test_dag2"
+DAG3_ID = "test_dag3"
+TASK_ID = "op1"
+API_PREFIX = "/public/dagStats"
+
+
+class TestDagStatsEndpoint:
+ default_time = "2020-06-11T18:00:00+00:00"
+
+ @staticmethod
+ def _clear_db():
+ clear_db_runs()
+ clear_db_dags()
+ clear_db_serialized_dags()
+
+ def _create_dag_and_runs(self, session=None):
+ dag_1 = DagModel(
+ dag_id=DAG1_ID,
+ fileloc="/tmp/dag_stats_1.py",
+ timetable_summary="2 2 * * *",
+ is_active=False,
+ is_paused=True,
+ owners="test_owner,another_test_owner",
+ next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+ )
+ dag_1_run_1 = DagRun(
+ dag_id=DAG1_ID,
+ run_id="test_dag_run_id_1",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state="running",
+ )
+ dag_1_run_2 = DagRun(
+ dag_id=dag_1.dag_id,
+ run_id="test_dag_run_id_2",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(self.default_time) +
timedelta(days=1),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state="failed",
+ )
+ dag_2 = DagModel(
+ dag_id=DAG2_ID,
+ fileloc="/tmp/dag_stats_2.py",
+ timetable_summary="2 2 * * *",
+ is_active=False,
+ is_paused=True,
+ owners="test_owner,another_test_owner",
+ next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+ )
+ dag_2_run_1 = DagRun(
+ dag_id=dag_2.dag_id,
+ run_id="test_dag_2_run_id_1",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state="queued",
+ )
+ dag_3 = DagModel(
+ dag_id=DAG3_ID,
+ fileloc="/tmp/dag_stats_3.py",
+ timetable_summary="2 2 * * *",
+ is_active=False,
+ is_paused=True,
+ owners="test_owner,another_test_owner",
+ next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+ )
+ dag_3_run_1 = DagRun(
+ dag_id=dag_3.dag_id,
+ run_id="test_dag_3_run_id_1",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state="success",
+ )
+ entities = (
+ dag_1,
+ dag_1_run_1,
+ dag_1_run_2,
+ dag_2,
+ dag_2_run_1,
+ dag_3,
+ dag_3_run_1,
+ )
+ session.add_all(entities)
+ session.commit()
+
+ @pytest.fixture(autouse=True)
+ def setup(self) -> None:
+ self._clear_db()
+
+ def teardown_method(self) -> None:
+ self._clear_db()
+
+
+class TestGetDagStats(TestDagStatsEndpoint):
+ """Unit tests for Get DAG Stats."""
+
+ def test_should_respond_200(self, client, session):
+ self._create_dag_and_runs(session)
+ exp_payload = {
+ "dags": [
+ {
+ "dag_id": DAG1_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 1,
+ },
+ ],
+ },
+ {
+ "dag_id": DAG2_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 0,
+ },
+ ],
+ },
+ ],
+ "total_entries": 2,
+ }
+
+ response =
client().get(f"{API_PREFIX}?dag_ids={DAG1_ID}&dag_ids={DAG2_ID}")
+ assert response.status_code == 200
+ res_json = response.json()
+ assert res_json["total_entries"] == len(res_json["dags"])
+ assert res_json == exp_payload
+
+ def test_all_dags_should_respond_200(self, client, session):
+ self._create_dag_and_runs(session)
+ exp_payload = {
+ "dags": [
+ {
+ "dag_id": DAG1_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 1,
+ },
+ ],
+ },
+ {
+ "dag_id": DAG2_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 0,
+ },
+ ],
+ },
+ {
+ "dag_id": DAG3_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 0,
+ },
+ ],
+ },
+ ],
+ "total_entries": 3,
+ }
+
+ response = client().get(API_PREFIX)
+ assert response.status_code == 200
+ res_json = response.json()
+ assert res_json["total_entries"] == len(res_json["dags"])
+ assert res_json == exp_payload
+
+ @pytest.mark.parametrize(
+ "url, params, exp_payload",
+ [
+ (
+ API_PREFIX,
+ [
+ ("dag_ids", DAG1_ID),
+ ("dag_ids", DAG3_ID),
+ ("dag_ids", DAG2_ID),
+ ],
+ {
+ "dags": [
+ {
+ "dag_id": DAG1_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 1,
+ },
+ ],
+ },
+ {
+ "dag_id": DAG2_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 0,
+ },
+ ],
+ },
+ {
+ "dag_id": DAG3_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 0,
+ },
+ ],
+ },
+ ],
+ "total_entries": 3,
+ },
+ ),
+ (
+ API_PREFIX,
+ [("dag_ids", DAG1_ID)],
+ {
+ "dags": [
+ {
+ "dag_id": DAG1_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 1,
+ },
+ ],
+ }
+ ],
+ "total_entries": 1,
+ },
+ ),
+ (
+ API_PREFIX,
+ [("dag_ids", DAG3_ID)],
+ {
+ "dags": [
+ {
+ "dag_id": DAG3_ID,
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 0,
+ },
+ ],
+ },
+ ],
+ "total_entries": 1,
+ },
+ ),
+ ],
+ )
+ def test_single_dag_in_dag_ids(self, client, session, url, params,
exp_payload):
+ self._create_dag_and_runs(session)
+ response = client().get(url, params=params)
+ assert response.status_code == 200
+ res_json = response.json()
+ assert res_json["total_entries"] == len(res_json["dags"])
+ assert res_json == exp_payload