This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 69c1f92eae Migrate the public endpoint Get DAG Stats to FastAPI 
(#43255)
69c1f92eae is described below

commit 69c1f92eae21ad0516c96298a210597269fddab9
Author: Omkar P <[email protected]>
AuthorDate: Tue Nov 5 21:28:11 2024 +0530

    Migrate the public endpoint Get DAG Stats to FastAPI (#43255)
    
    * Migrate public endpoint Get DAG Stats to FastAPI, with main resynced
    
    * Re-run static checks
    
    * Add newlines to separate entities
---
 .../api_connexion/endpoints/dag_stats_endpoint.py  |   2 +
 airflow/api_fastapi/common/db/common.py            |   5 +-
 airflow/api_fastapi/common/db/dag_runs.py          |  32 ++
 airflow/api_fastapi/common/parameters.py           |  17 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml |  98 +++++
 .../api_fastapi/core_api/routes/public/__init__.py |   2 +
 .../core_api/routes/public/dag_stats.py            |  79 ++++
 .../api_fastapi/core_api/serializers/dag_stats.py  |  43 +++
 airflow/ui/openapi-gen/queries/common.ts           |  17 +
 airflow/ui/openapi-gen/queries/prefetch.ts         |  21 ++
 airflow/ui/openapi-gen/queries/queries.ts          |  27 ++
 airflow/ui/openapi-gen/queries/suspense.ts         |  27 ++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  56 +++
 airflow/ui/openapi-gen/requests/services.gen.ts    |  31 ++
 airflow/ui/openapi-gen/requests/types.gen.ts       |  61 +++
 .../core_api/routes/public/test_dag_stats.py       | 416 +++++++++++++++++++++
 16 files changed, 933 insertions(+), 1 deletion(-)

diff --git a/airflow/api_connexion/endpoints/dag_stats_endpoint.py 
b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
index 3b6c6ab8e0..c4d8701f8d 100644
--- a/airflow/api_connexion/endpoints/dag_stats_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
@@ -27,6 +27,7 @@ from airflow.api_connexion.schemas.dag_stats_schema import (
 )
 from airflow.auth.managers.models.resource_details import DagAccessEntity
 from airflow.models.dag import DagRun
+from airflow.utils.api_migration import mark_fastapi_migration_done
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import DagRunState
 from airflow.www.extensions.init_auth_manager import get_auth_manager
@@ -37,6 +38,7 @@ if TYPE_CHECKING:
     from airflow.api_connexion.types import APIResponse
 
 
+@mark_fastapi_migration_done
 @security.requires_access_dag("GET", DagAccessEntity.RUN)
 @provide_session
 def get_dag_stats(
diff --git a/airflow/api_fastapi/common/db/common.py 
b/airflow/api_fastapi/common/db/common.py
index 3feb3ba599..01e1fe532b 100644
--- a/airflow/api_fastapi/common/db/common.py
+++ b/airflow/api_fastapi/common/db/common.py
@@ -65,13 +65,16 @@ def paginated_select(
     offset: BaseParam | None = None,
     limit: BaseParam | None = None,
     session: Session = NEW_SESSION,
+    return_total_entries: bool = True,
 ) -> Select:
     base_select = apply_filters_to_select(
         base_select,
         filters,
     )
 
-    total_entries = get_query_count(base_select, session=session)
+    total_entries = None
+    if return_total_entries:
+        total_entries = get_query_count(base_select, session=session)
 
     # TODO: Re-enable when permissions are handled. Readable / writable 
entities,
     # for instance:
diff --git a/airflow/api_fastapi/common/db/dag_runs.py 
b/airflow/api_fastapi/common/db/dag_runs.py
new file mode 100644
index 0000000000..8f4b02a067
--- /dev/null
+++ b/airflow/api_fastapi/common/db/dag_runs.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from sqlalchemy import func, select
+
+from airflow.models.dagrun import DagRun
+
+dagruns_select_with_state_count = (
+    select(
+        DagRun.dag_id,
+        DagRun.state,
+        func.count(DagRun.state),
+    )
+    .group_by(DagRun.dag_id, DagRun.state)
+    .order_by(DagRun.dag_id)
+)
diff --git a/airflow/api_fastapi/common/parameters.py 
b/airflow/api_fastapi/common/parameters.py
index 218077ca59..64ae9406f0 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -112,6 +112,18 @@ class _OnlyActiveFilter(BaseParam[bool]):
         return self.set_value(only_active)
 
 
+class _DagIdsFilter(BaseParam[list[str]]):
+    """Filter on multi-valued dag_ids param for DagRun."""
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value and self.skip_none:
+            return select.where(DagRun.dag_id.in_(self.value))
+        return select
+
+    def depends(self, dag_ids: list[str] = Query(None)) -> _DagIdsFilter:
+        return self.set_value(dag_ids)
+
+
 class _SearchParam(BaseParam[str]):
     """Search on attribute."""
 
@@ -325,6 +337,7 @@ class _DagIdFilter(BaseParam[str]):
 
 # Common Safe DateTime
 DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
+
 # DAG
 QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter().depends)]
 QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter().depends)]
@@ -339,10 +352,14 @@ QueryDagIdPatternSearchWithNone = Annotated[
 ]
 QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter().depends)]
 QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)]
+
 # DagRun
 QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, 
Depends(_LastDagRunStateFilter().depends)]
+QueryDagIdsFilter = Annotated[_DagIdsFilter, Depends(_DagIdsFilter().depends)]
+
 # DAGWarning
 QueryDagIdInDagWarningFilter = Annotated[_DagIdFilter, 
Depends(_DagIdFilter(DagWarning.dag_id).depends)]
 QueryWarningTypeFilter = Annotated[_WarningTypeFilter, 
Depends(_WarningTypeFilter().depends)]
+
 # DAGTags
 QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, 
Depends(_DagTagNamePatternSearch().depends)]
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 06a041b55d..5f60903947 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -2462,6 +2462,59 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/VersionInfo'
+  /public/dagStats/:
+    get:
+      tags:
+      - DagStats
+      summary: Get Dag Stats
+      description: Get Dag statistics.
+      operationId: get_dag_stats
+      parameters:
+      - name: dag_ids
+        in: query
+        required: false
+        schema:
+          type: array
+          items:
+            type: string
+          title: Dag Ids
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DagStatsCollectionResponse'
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Bad Request
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
 components:
   schemas:
     AppBuilderMenuItemResponse:
@@ -3467,6 +3520,51 @@ components:
       - asset_triggered
       title: DagRunType
       description: Class with DagRun types.
+    DagStatsCollectionResponse:
+      properties:
+        dags:
+          items:
+            $ref: '#/components/schemas/DagStatsResponse'
+          type: array
+          title: Dags
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - dags
+      - total_entries
+      title: DagStatsCollectionResponse
+      description: DAG Stats Collection serializer for responses.
+    DagStatsResponse:
+      properties:
+        dag_id:
+          type: string
+          title: Dag Id
+        stats:
+          items:
+            $ref: '#/components/schemas/DagStatsStateResponse'
+          type: array
+          title: Stats
+      type: object
+      required:
+      - dag_id
+      - stats
+      title: DagStatsResponse
+      description: DAG Stats serializer for responses.
+    DagStatsStateResponse:
+      properties:
+        state:
+          $ref: '#/components/schemas/DagRunState'
+        count:
+          type: integer
+          title: Count
+      type: object
+      required:
+      - state
+      - count
+      title: DagStatsStateResponse
+      description: DagStatsState serializer for responses.
     DagTagPydantic:
       properties:
         name:
diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py 
b/airflow/api_fastapi/core_api/routes/public/__init__.py
index 68caa2d775..b7c8affe4a 100644
--- a/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -22,6 +22,7 @@ from airflow.api_fastapi.core_api.routes.public.backfills 
import backfills_route
 from airflow.api_fastapi.core_api.routes.public.connections import 
connections_router
 from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
 from airflow.api_fastapi.core_api.routes.public.dag_sources import 
dag_sources_router
+from airflow.api_fastapi.core_api.routes.public.dag_stats import 
dag_stats_router
 from airflow.api_fastapi.core_api.routes.public.dag_warning import 
dag_warning_router
 from airflow.api_fastapi.core_api.routes.public.dags import dags_router
 from airflow.api_fastapi.core_api.routes.public.event_logs import 
event_logs_router
@@ -54,3 +55,4 @@ public_router.include_router(task_instances_router)
 public_router.include_router(variables_router)
 public_router.include_router(variables_router)
 public_router.include_router(version_router)
+public_router.include_router(dag_stats_router)
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_stats.py 
b/airflow/api_fastapi/core_api/routes/public/dag_stats.py
new file mode 100644
index 0000000000..deed40f011
--- /dev/null
+++ b/airflow/api_fastapi/core_api/routes/public/dag_stats.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from fastapi import Depends
+from sqlalchemy.orm import Session
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.common.db.common import (
+    get_session,
+    paginated_select,
+)
+from airflow.api_fastapi.common.db.dag_runs import 
dagruns_select_with_state_count
+from airflow.api_fastapi.common.parameters import QueryDagIdsFilter
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.serializers.dag_stats import (
+    DagStatsCollectionResponse,
+    DagStatsResponse,
+    DagStatsStateResponse,
+)
+from airflow.utils.state import DagRunState
+
+dag_stats_router = AirflowRouter(tags=["DagStats"], prefix="/dagStats")
+
+
+@dag_stats_router.get(
+    "/",
+    responses=create_openapi_http_exception_doc([400, 401, 403, 404]),
+)
+async def get_dag_stats(
+    session: Annotated[Session, Depends(get_session)],
+    dag_ids: QueryDagIdsFilter,
+) -> DagStatsCollectionResponse:
+    """Get Dag statistics."""
+    dagruns_select, _ = paginated_select(
+        base_select=dagruns_select_with_state_count,
+        filters=[dag_ids],
+        session=session,
+        return_total_entries=False,
+    )
+    query_result = session.execute(dagruns_select)
+
+    result_dag_ids = []
+    dag_state_data = {}
+    for dag_id, state, count in query_result:
+        dag_state_data[(dag_id, state)] = count
+        if dag_id not in result_dag_ids:
+            result_dag_ids.append(dag_id)
+
+    dags = [
+        DagStatsResponse(
+            dag_id=dag_id,
+            stats=[
+                DagStatsStateResponse(
+                    state=state,
+                    count=dag_state_data.get((dag_id, state), 0),
+                )
+                for state in DagRunState
+            ],
+        )
+        for dag_id in result_dag_ids
+    ]
+    return DagStatsCollectionResponse(dags=dags, total_entries=len(dags))
diff --git a/airflow/api_fastapi/core_api/serializers/dag_stats.py 
b/airflow/api_fastapi/core_api/serializers/dag_stats.py
new file mode 100644
index 0000000000..0d768c2cba
--- /dev/null
+++ b/airflow/api_fastapi/core_api/serializers/dag_stats.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+from airflow.utils.state import DagRunState
+
+
+class DagStatsStateResponse(BaseModel):
+    """DagStatsState serializer for responses."""
+
+    state: DagRunState
+    count: int
+
+
+class DagStatsResponse(BaseModel):
+    """DAG Stats serializer for responses."""
+
+    dag_id: str
+    stats: list[DagStatsStateResponse]
+
+
+class DagStatsCollectionResponse(BaseModel):
+    """DAG Stats Collection serializer for responses."""
+
+    dags: list[DagStatsResponse]
+    total_entries: int
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 2ed842201c..cec1f0f314 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -8,6 +8,7 @@ import {
   DagRunService,
   DagService,
   DagSourceService,
+  DagStatsService,
   DagWarningService,
   DagsService,
   DashboardService,
@@ -679,6 +680,22 @@ export const UseVersionServiceGetVersionKeyFn = 
(queryKey?: Array<unknown>) => [
   useVersionServiceGetVersionKey,
   ...(queryKey ?? []),
 ];
+export type DagStatsServiceGetDagStatsDefaultResponse = Awaited<
+  ReturnType<typeof DagStatsService.getDagStats>
+>;
+export type DagStatsServiceGetDagStatsQueryResult<
+  TData = DagStatsServiceGetDagStatsDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagStatsServiceGetDagStatsKey = "DagStatsServiceGetDagStats";
+export const UseDagStatsServiceGetDagStatsKeyFn = (
+  {
+    dagIds,
+  }: {
+    dagIds?: string[];
+  } = {},
+  queryKey?: Array<unknown>,
+) => [useDagStatsServiceGetDagStatsKey, ...(queryKey ?? [{ dagIds }])];
 export type BackfillServiceCreateBackfillMutationResult = Awaited<
   ReturnType<typeof BackfillService.createBackfill>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 6c41b7a5a1..04443427ac 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -8,6 +8,7 @@ import {
   DagRunService,
   DagService,
   DagSourceService,
+  DagStatsService,
   DagWarningService,
   DagsService,
   DashboardService,
@@ -875,3 +876,23 @@ export const prefetchUseVersionServiceGetVersion = 
(queryClient: QueryClient) =>
     queryKey: Common.UseVersionServiceGetVersionKeyFn(),
     queryFn: () => VersionService.getVersion(),
   });
+/**
+ * Get Dag Stats
+ * Get Dag statistics.
+ * @param data The data for the request.
+ * @param data.dagIds
+ * @returns DagStatsCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagStatsServiceGetDagStats = (
+  queryClient: QueryClient,
+  {
+    dagIds,
+  }: {
+    dagIds?: string[];
+  } = {},
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }),
+    queryFn: () => DagStatsService.getDagStats({ dagIds }),
+  });
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 583f14f771..11dea6f3df 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -13,6 +13,7 @@ import {
   DagRunService,
   DagService,
   DagSourceService,
+  DagStatsService,
   DagWarningService,
   DagsService,
   DashboardService,
@@ -1093,6 +1094,32 @@ export const useVersionServiceGetVersion = <
     queryFn: () => VersionService.getVersion() as TData,
     ...options,
   });
+/**
+ * Get Dag Stats
+ * Get Dag statistics.
+ * @param data The data for the request.
+ * @param data.dagIds
+ * @returns DagStatsCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagStatsServiceGetDagStats = <
+  TData = Common.DagStatsServiceGetDagStatsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagIds,
+  }: {
+    dagIds?: string[];
+  } = {},
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }, queryKey),
+    queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData,
+    ...options,
+  });
 /**
  * Create Backfill
  * @param data The data for the request.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 2870605672..eed1a0afe8 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -8,6 +8,7 @@ import {
   DagRunService,
   DagService,
   DagSourceService,
+  DagStatsService,
   DagWarningService,
   DagsService,
   DashboardService,
@@ -1078,3 +1079,29 @@ export const useVersionServiceGetVersionSuspense = <
     queryFn: () => VersionService.getVersion() as TData,
     ...options,
   });
+/**
+ * Get Dag Stats
+ * Get Dag statistics.
+ * @param data The data for the request.
+ * @param data.dagIds
+ * @returns DagStatsCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagStatsServiceGetDagStatsSuspense = <
+  TData = Common.DagStatsServiceGetDagStatsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagIds,
+  }: {
+    dagIds?: string[];
+  } = {},
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }, queryKey),
+    queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData,
+    ...options,
+  });
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index c1dc8cd345..d64abb3853 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1580,6 +1580,62 @@ export const $DagRunType = {
   description: "Class with DagRun types.",
 } as const;
 
+export const $DagStatsCollectionResponse = {
+  properties: {
+    dags: {
+      items: {
+        $ref: "#/components/schemas/DagStatsResponse",
+      },
+      type: "array",
+      title: "Dags",
+    },
+    total_entries: {
+      type: "integer",
+      title: "Total Entries",
+    },
+  },
+  type: "object",
+  required: ["dags", "total_entries"],
+  title: "DagStatsCollectionResponse",
+  description: "DAG Stats Collection serializer for responses.",
+} as const;
+
+export const $DagStatsResponse = {
+  properties: {
+    dag_id: {
+      type: "string",
+      title: "Dag Id",
+    },
+    stats: {
+      items: {
+        $ref: "#/components/schemas/DagStatsStateResponse",
+      },
+      type: "array",
+      title: "Stats",
+    },
+  },
+  type: "object",
+  required: ["dag_id", "stats"],
+  title: "DagStatsResponse",
+  description: "DAG Stats serializer for responses.",
+} as const;
+
+export const $DagStatsStateResponse = {
+  properties: {
+    state: {
+      $ref: "#/components/schemas/DagRunState",
+    },
+    count: {
+      type: "integer",
+      title: "Count",
+    },
+  },
+  type: "object",
+  required: ["state", "count"],
+  title: "DagStatsStateResponse",
+  description: "DagStatsState serializer for responses.",
+} as const;
+
 export const $DagTagPydantic = {
   properties: {
     name: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 4eecb848a5..915636fbb5 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -91,6 +91,8 @@ import type {
   PostVariableData,
   PostVariableResponse,
   GetVersionResponse,
+  GetDagStatsData,
+  GetDagStatsResponse,
 } from "./types.gen";
 
 export class AssetService {
@@ -1415,3 +1417,32 @@ export class VersionService {
     });
   }
 }
+
+export class DagStatsService {
+  /**
+   * Get Dag Stats
+   * Get Dag statistics.
+   * @param data The data for the request.
+   * @param data.dagIds
+   * @returns DagStatsCollectionResponse Successful Response
+   * @throws ApiError
+   */
+  public static getDagStats(
+    data: GetDagStatsData = {},
+  ): CancelablePromise<GetDagStatsResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: "/public/dagStats/",
+      query: {
+        dag_ids: data.dagIds,
+      },
+      errors: {
+        400: "Bad Request",
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
+}
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 603a20d090..f96acacf61 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -347,6 +347,30 @@ export type DagRunType =
   | "manual"
   | "asset_triggered";
 
+/**
+ * DAG Stats Collection serializer for responses.
+ */
+export type DagStatsCollectionResponse = {
+  dags: Array<DagStatsResponse>;
+  total_entries: number;
+};
+
+/**
+ * DAG Stats serializer for responses.
+ */
+export type DagStatsResponse = {
+  dag_id: string;
+  stats: Array<DagStatsStateResponse>;
+};
+
+/**
+ * DagStatsState serializer for responses.
+ */
+export type DagStatsStateResponse = {
+  state: DagRunState;
+  count: number;
+};
+
 /**
  * Serializable representation of the DagTag ORM SqlAlchemyModel used by 
internal API.
  */
@@ -1045,6 +1069,12 @@ export type PostVariableResponse = VariableResponse;
 
 export type GetVersionResponse = VersionInfo;
 
+export type GetDagStatsData = {
+  dagIds?: Array<string>;
+};
+
+export type GetDagStatsResponse = DagStatsCollectionResponse;
+
 export type $OpenApiTs = {
   "/ui/next_run_assets/{dag_id}": {
     get: {
@@ -2165,4 +2195,35 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/dagStats/": {
+    get: {
+      req: GetDagStatsData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: DagStatsCollectionResponse;
+        /**
+         * Bad Request
+         */
+        400: HTTPExceptionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
 };
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_stats.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_stats.py
new file mode 100644
index 0000000000..e2611addd7
--- /dev/null
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_stats.py
@@ -0,0 +1,416 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow.models.dag import DagModel
+from airflow.models.dagrun import DagRun
+from airflow.utils import timezone
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunType
+
+from tests_common.test_utils.db import clear_db_dags, clear_db_runs, 
clear_db_serialized_dags
+
+pytestmark = pytest.mark.db_test
+
+DAG1_ID = "test_dag1"
+DAG2_ID = "test_dag2"
+DAG3_ID = "test_dag3"
+TASK_ID = "op1"
+API_PREFIX = "/public/dagStats"
+
+
+class TestDagStatsEndpoint:
+    default_time = "2020-06-11T18:00:00+00:00"
+
+    @staticmethod
+    def _clear_db():
+        clear_db_runs()
+        clear_db_dags()
+        clear_db_serialized_dags()
+
+    def _create_dag_and_runs(self, session=None):
+        dag_1 = DagModel(
+            dag_id=DAG1_ID,
+            fileloc="/tmp/dag_stats_1.py",
+            timetable_summary="2 2 * * *",
+            is_active=False,
+            is_paused=True,
+            owners="test_owner,another_test_owner",
+            next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+        )
+        dag_1_run_1 = DagRun(
+            dag_id=DAG1_ID,
+            run_id="test_dag_run_id_1",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state="running",
+        )
+        dag_1_run_2 = DagRun(
+            dag_id=dag_1.dag_id,
+            run_id="test_dag_run_id_2",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time) + 
timedelta(days=1),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state="failed",
+        )
+        dag_2 = DagModel(
+            dag_id=DAG2_ID,
+            fileloc="/tmp/dag_stats_2.py",
+            timetable_summary="2 2 * * *",
+            is_active=False,
+            is_paused=True,
+            owners="test_owner,another_test_owner",
+            next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+        )
+        dag_2_run_1 = DagRun(
+            dag_id=dag_2.dag_id,
+            run_id="test_dag_2_run_id_1",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state="queued",
+        )
+        dag_3 = DagModel(
+            dag_id=DAG3_ID,
+            fileloc="/tmp/dag_stats_3.py",
+            timetable_summary="2 2 * * *",
+            is_active=False,
+            is_paused=True,
+            owners="test_owner,another_test_owner",
+            next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+        )
+        dag_3_run_1 = DagRun(
+            dag_id=dag_3.dag_id,
+            run_id="test_dag_3_run_id_1",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state="success",
+        )
+        entities = (
+            dag_1,
+            dag_1_run_1,
+            dag_1_run_2,
+            dag_2,
+            dag_2_run_1,
+            dag_3,
+            dag_3_run_1,
+        )
+        session.add_all(entities)
+        session.commit()
+
+    @pytest.fixture(autouse=True)
+    def setup(self) -> None:
+        self._clear_db()
+
+    def teardown_method(self) -> None:
+        self._clear_db()
+
+
+class TestGetDagStats(TestDagStatsEndpoint):
+    """Unit tests for Get DAG Stats."""
+
+    def test_should_respond_200(self, client, session):
+        self._create_dag_and_runs(session)
+        exp_payload = {
+            "dags": [
+                {
+                    "dag_id": DAG1_ID,
+                    "stats": [
+                        {
+                            "state": DagRunState.QUEUED,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 1,
+                        },
+                        {
+                            "state": DagRunState.SUCCESS,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.FAILED,
+                            "count": 1,
+                        },
+                    ],
+                },
+                {
+                    "dag_id": DAG2_ID,
+                    "stats": [
+                        {
+                            "state": DagRunState.QUEUED,
+                            "count": 1,
+                        },
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.SUCCESS,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.FAILED,
+                            "count": 0,
+                        },
+                    ],
+                },
+            ],
+            "total_entries": 2,
+        }
+
+        response = 
client().get(f"{API_PREFIX}?dag_ids={DAG1_ID}&dag_ids={DAG2_ID}")
+        assert response.status_code == 200
+        res_json = response.json()
+        assert res_json["total_entries"] == len(res_json["dags"])
+        assert res_json == exp_payload
+
+    def test_all_dags_should_respond_200(self, client, session):
+        self._create_dag_and_runs(session)
+        exp_payload = {
+            "dags": [
+                {
+                    "dag_id": DAG1_ID,
+                    "stats": [
+                        {
+                            "state": DagRunState.QUEUED,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 1,
+                        },
+                        {
+                            "state": DagRunState.SUCCESS,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.FAILED,
+                            "count": 1,
+                        },
+                    ],
+                },
+                {
+                    "dag_id": DAG2_ID,
+                    "stats": [
+                        {
+                            "state": DagRunState.QUEUED,
+                            "count": 1,
+                        },
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.SUCCESS,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.FAILED,
+                            "count": 0,
+                        },
+                    ],
+                },
+                {
+                    "dag_id": DAG3_ID,
+                    "stats": [
+                        {
+                            "state": DagRunState.QUEUED,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.SUCCESS,
+                            "count": 1,
+                        },
+                        {
+                            "state": DagRunState.FAILED,
+                            "count": 0,
+                        },
+                    ],
+                },
+            ],
+            "total_entries": 3,
+        }
+
+        response = client().get(API_PREFIX)
+        assert response.status_code == 200
+        res_json = response.json()
+        assert res_json["total_entries"] == len(res_json["dags"])
+        assert res_json == exp_payload
+
+    @pytest.mark.parametrize(
+        "url, params, exp_payload",
+        [
+            (
+                API_PREFIX,
+                [
+                    ("dag_ids", DAG1_ID),
+                    ("dag_ids", DAG3_ID),
+                    ("dag_ids", DAG2_ID),
+                ],
+                {
+                    "dags": [
+                        {
+                            "dag_id": DAG1_ID,
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 1,
+                                },
+                            ],
+                        },
+                        {
+                            "dag_id": DAG2_ID,
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 0,
+                                },
+                            ],
+                        },
+                        {
+                            "dag_id": DAG3_ID,
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 0,
+                                },
+                            ],
+                        },
+                    ],
+                    "total_entries": 3,
+                },
+            ),
+            (
+                API_PREFIX,
+                [("dag_ids", DAG1_ID)],
+                {
+                    "dags": [
+                        {
+                            "dag_id": DAG1_ID,
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 1,
+                                },
+                            ],
+                        }
+                    ],
+                    "total_entries": 1,
+                },
+            ),
+            (
+                API_PREFIX,
+                [("dag_ids", DAG3_ID)],
+                {
+                    "dags": [
+                        {
+                            "dag_id": DAG3_ID,
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 0,
+                                },
+                            ],
+                        },
+                    ],
+                    "total_entries": 1,
+                },
+            ),
+        ],
+    )
+    def test_single_dag_in_dag_ids(self, client, session, url, params, 
exp_payload):
+        self._create_dag_and_runs(session)
+        response = client().get(url, params=params)
+        assert response.status_code == 200
+        res_json = response.json()
+        assert res_json["total_entries"] == len(res_json["dags"])
+        assert res_json == exp_payload


Reply via email to