This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66a89f1bd4 AIP-84 Migrate views /object/historical_metrics_data to 
Fast API (#42629)
66a89f1bd4 is described below

commit 66a89f1bd471cc7038031109dbff3769504f1c71
Author: Bugra Ozturk <[email protected]>
AuthorDate: Wed Oct 9 10:09:40 2024 +0200

    AIP-84 Migrate views /object/historical_metrics_data to Fast API (#42629)
    
    * Include object router and migrate objects/historical_metrics_data to 
FastAPI
    
    * Use pyfixture session
    
    * Include provide_session for SQLite session thread issue
    
    * Make method async and remove unused session from tests
    
    * Include return type to generate proper openapi spec
    
    * Object definition to Dashboard definition, make safe date Annotated 
parameter in parameters.py
    
    * Include pydantic model for Dashboard Historical Metric Data and convert 
response to HistoricalMetricDataResponse
    
    * Fix conflict and rerun pre-commit hooks
    
    * Rename data variable to historical metrics for consistency
    
    * Include object router and migrate objects/historical_metrics_data to 
FastAPI
    
    * Object definition to Dashboard definition, make safe date Annotated 
parameter in parameters.py
    
    * Drop data from method name to prevent double Data in the name of objects
    
    * Variable name change to prevent shadow naming with method name
    
    * Resolve conflicts and rebase again
    
    * Resolve conflicts and rebase
---
 airflow/api_fastapi/openapi/v1-generated.yaml   | 156 ++++++++++++++++++++++++
 airflow/api_fastapi/parameters.py               |  22 ++++
 airflow/api_fastapi/serializers/dashboard.py    |  63 ++++++++++
 airflow/api_fastapi/views/ui/__init__.py        |   2 +
 airflow/api_fastapi/views/ui/dashboard.py       | 100 +++++++++++++++
 airflow/ui/openapi-gen/queries/common.ts        |  23 ++++
 airflow/ui/openapi-gen/queries/prefetch.ts      |  27 ++++
 airflow/ui/openapi-gen/queries/queries.ts       |  38 +++++-
 airflow/ui/openapi-gen/queries/suspense.ts      |  34 ++++++
 airflow/ui/openapi-gen/requests/schemas.gen.ts  | 143 ++++++++++++++++++++++
 airflow/ui/openapi-gen/requests/services.gen.ts |  30 +++++
 airflow/ui/openapi-gen/requests/types.gen.ts    |  74 +++++++++++
 airflow/www/views.py                            |   1 +
 tests/api_fastapi/views/ui/test_dashboard.py    | 151 +++++++++++++++++++++++
 14 files changed, 862 insertions(+), 2 deletions(-)

diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml 
b/airflow/api_fastapi/openapi/v1-generated.yaml
index f3bc8612bf..28723b8008 100644
--- a/airflow/api_fastapi/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/openapi/v1-generated.yaml
@@ -34,6 +34,45 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /ui/dashboard/historical_metrics_data:
+    get:
+      tags:
+      - Dashboard
+      summary: Historical Metrics
+      description: Return cluster activity historical metrics.
+      operationId: historical_metrics
+      parameters:
+      - name: start_date
+        in: query
+        required: true
+        schema:
+          type: string
+          title: Start Date
+      - name: end_date
+        in: query
+        required: true
+        schema:
+          type: string
+          title: End Date
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HistoricalMetricDataResponse'
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Bad Request
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/dags/:
     get:
       tags:
@@ -964,6 +1003,50 @@ components:
       - file_token
       title: DAGResponse
       description: DAG serializer for responses.
+    DAGRunStates:
+      properties:
+        queued:
+          type: integer
+          title: Queued
+        running:
+          type: integer
+          title: Running
+        success:
+          type: integer
+          title: Success
+        failed:
+          type: integer
+          title: Failed
+      type: object
+      required:
+      - queued
+      - running
+      - success
+      - failed
+      title: DAGRunStates
+      description: DAG Run States for responses.
+    DAGRunTypes:
+      properties:
+        backfill:
+          type: integer
+          title: Backfill
+        scheduled:
+          type: integer
+          title: Scheduled
+        manual:
+          type: integer
+          title: Manual
+        dataset_triggered:
+          type: integer
+          title: Dataset Triggered
+      type: object
+      required:
+      - backfill
+      - scheduled
+      - manual
+      - dataset_triggered
+      title: DAGRunTypes
+      description: DAG Run Types for responses.
     DagRunState:
       type: string
       enum:
@@ -1016,6 +1099,79 @@ components:
           title: Detail
       type: object
       title: HTTPValidationError
+    HistoricalMetricDataResponse:
+      properties:
+        dag_run_types:
+          $ref: '#/components/schemas/DAGRunTypes'
+        dag_run_states:
+          $ref: '#/components/schemas/DAGRunStates'
+        task_instance_states:
+          $ref: '#/components/schemas/TaskInstantState'
+      type: object
+      required:
+      - dag_run_types
+      - dag_run_states
+      - task_instance_states
+      title: HistoricalMetricDataResponse
+      description: Historical Metric Data serializer for responses.
+    TaskInstantState:
+      properties:
+        no_status:
+          type: integer
+          title: No Status
+        removed:
+          type: integer
+          title: Removed
+        scheduled:
+          type: integer
+          title: Scheduled
+        queued:
+          type: integer
+          title: Queued
+        running:
+          type: integer
+          title: Running
+        success:
+          type: integer
+          title: Success
+        restarting:
+          type: integer
+          title: Restarting
+        failed:
+          type: integer
+          title: Failed
+        up_for_retry:
+          type: integer
+          title: Up For Retry
+        up_for_reschedule:
+          type: integer
+          title: Up For Reschedule
+        upstream_failed:
+          type: integer
+          title: Upstream Failed
+        skipped:
+          type: integer
+          title: Skipped
+        deferred:
+          type: integer
+          title: Deferred
+      type: object
+      required:
+      - no_status
+      - removed
+      - scheduled
+      - queued
+      - running
+      - success
+      - restarting
+      - failed
+      - up_for_retry
+      - up_for_reschedule
+      - upstream_failed
+      - skipped
+      - deferred
+      title: TaskInstantState
+      description: TaskInstance serializer for responses.
     ValidationError:
       properties:
         loc:
diff --git a/airflow/api_fastapi/parameters.py 
b/airflow/api_fastapi/parameters.py
index 504014602f..59d61ad686 100644
--- a/airflow/api_fastapi/parameters.py
+++ b/airflow/api_fastapi/parameters.py
@@ -18,14 +18,18 @@
 from __future__ import annotations
 
 from abc import ABC, abstractmethod
+from datetime import datetime
 from typing import TYPE_CHECKING, Any, Generic, List, TypeVar
 
 from fastapi import Depends, HTTPException, Query
+from pendulum.parsing.exceptions import ParserError
+from pydantic import AfterValidator
 from sqlalchemy import case, or_
 from typing_extensions import Annotated, Self
 
 from airflow.models.dag import DagModel, DagTag
 from airflow.models.dagrun import DagRun
+from airflow.utils import timezone
 from airflow.utils.state import DagRunState
 
 if TYPE_CHECKING:
@@ -235,6 +239,24 @@ class _LastDagRunStateFilter(BaseParam[DagRunState]):
         return self.set_value(last_dag_run_state)
 
 
+def _safe_parse_datetime(date_to_check: str) -> datetime:
+    """
+    Parse datetime and raise error for invalid dates.
+
+    :param date_to_check: the string value to be parsed
+    """
+    if not date_to_check:
+        raise ValueError(f"{date_to_check} cannot be None.")
+    try:
+        return timezone.parse(date_to_check, strict=True)
+    except (TypeError, ParserError):
+        raise HTTPException(
+            400, f"Invalid datetime: {date_to_check!r}. Please check the date 
parameter have this value."
+        )
+
+
+# Common Safe DateTime
+DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
 # DAG
 QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter().depends)]
 QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter().depends)]
diff --git a/airflow/api_fastapi/serializers/dashboard.py 
b/airflow/api_fastapi/serializers/dashboard.py
new file mode 100644
index 0000000000..f5a38fa22e
--- /dev/null
+++ b/airflow/api_fastapi/serializers/dashboard.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+
+class DAGRunTypes(BaseModel):
+    """DAG Run Types for responses."""
+
+    backfill: int
+    scheduled: int
+    manual: int
+    dataset_triggered: int
+
+
+class DAGRunStates(BaseModel):
+    """DAG Run States for responses."""
+
+    queued: int
+    running: int
+    success: int
+    failed: int
+
+
+class TaskInstantState(BaseModel):
+    """TaskInstance serializer for responses."""
+
+    no_status: int
+    removed: int
+    scheduled: int
+    queued: int
+    running: int
+    success: int
+    restarting: int
+    failed: int
+    up_for_retry: int
+    up_for_reschedule: int
+    upstream_failed: int
+    skipped: int
+    deferred: int
+
+
+class HistoricalMetricDataResponse(BaseModel):
+    """Historical Metric Data serializer for responses."""
+
+    dag_run_types: DAGRunTypes
+    dag_run_states: DAGRunStates
+    task_instance_states: TaskInstantState
diff --git a/airflow/api_fastapi/views/ui/__init__.py 
b/airflow/api_fastapi/views/ui/__init__.py
index 8495ac5e5e..f01686cc99 100644
--- a/airflow/api_fastapi/views/ui/__init__.py
+++ b/airflow/api_fastapi/views/ui/__init__.py
@@ -18,7 +18,9 @@ from __future__ import annotations
 
 from airflow.api_fastapi.views.router import AirflowRouter
 from airflow.api_fastapi.views.ui.assets import assets_router
+from airflow.api_fastapi.views.ui.dashboard import dashboard_router
 
 ui_router = AirflowRouter(prefix="/ui")
 
 ui_router.include_router(assets_router)
+ui_router.include_router(dashboard_router)
diff --git a/airflow/api_fastapi/views/ui/dashboard.py 
b/airflow/api_fastapi/views/ui/dashboard.py
new file mode 100644
index 0000000000..0d6b69a1ce
--- /dev/null
+++ b/airflow/api_fastapi/views/ui/dashboard.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from fastapi import Depends
+from sqlalchemy import func, select
+from sqlalchemy.orm import Session
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.parameters import DateTimeQuery
+from airflow.api_fastapi.serializers.dashboard import 
HistoricalMetricDataResponse
+from airflow.models.dagrun import DagRun, DagRunType
+from airflow.models.taskinstance import TaskInstance
+from airflow.utils.state import DagRunState, TaskInstanceState
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+from airflow.api_fastapi.db.common import get_session
+from airflow.api_fastapi.views.router import AirflowRouter
+from airflow.utils import timezone
+
+dashboard_router = AirflowRouter(tags=["Dashboard"])
+
+
+@dashboard_router.get(
+    "/dashboard/historical_metrics_data",
+    include_in_schema=False,
+    responses=create_openapi_http_exception_doc([400]),
+)
+async def historical_metrics(
+    start_date: DateTimeQuery,
+    end_date: DateTimeQuery,
+    session: Annotated[Session, Depends(get_session)],
+) -> HistoricalMetricDataResponse:
+    """Return cluster activity historical metrics."""
+    # DagRuns
+    dag_run_types = session.execute(
+        select(DagRun.run_type, func.count(DagRun.run_id))
+        .where(
+            DagRun.start_date >= start_date,
+            func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
+        )
+        .group_by(DagRun.run_type)
+    ).all()
+
+    dag_run_states = session.execute(
+        select(DagRun.state, func.count(DagRun.run_id))
+        .where(
+            DagRun.start_date >= start_date,
+            func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
+        )
+        .group_by(DagRun.state)
+    ).all()
+
+    # TaskInstances
+    task_instance_states = session.execute(
+        select(TaskInstance.state, func.count(TaskInstance.run_id))
+        .join(TaskInstance.dag_run)
+        .where(
+            DagRun.start_date >= start_date,
+            func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
+        )
+        .group_by(TaskInstance.state)
+    ).all()
+
+    # Combining historical metrics response as dictionary
+    historical_metrics_response = {
+        "dag_run_types": {
+            **{dag_run_type.value: 0 for dag_run_type in DagRunType},
+            **dict(dag_run_types),
+        },
+        "dag_run_states": {
+            **{dag_run_state.value: 0 for dag_run_state in DagRunState},
+            **dict(dag_run_states),
+        },
+        "task_instance_states": {
+            "no_status": 0,
+            **{ti_state.value: 0 for ti_state in TaskInstanceState},
+            **{ti_state or "no_status": sum_value for ti_state, sum_value in 
task_instance_states},
+        },
+    }
+
+    return 
HistoricalMetricDataResponse.model_validate(historical_metrics_response, 
from_attributes=True)
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index afd7afd5bc..fbbbac5d60 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -5,6 +5,7 @@ import {
   AssetService,
   ConnectionService,
   DagService,
+  DashboardService,
   VariableService,
 } from "../requests/services.gen";
 import { DagRunState } from "../requests/types.gen";
@@ -25,6 +26,28 @@ export const UseAssetServiceNextRunAssetsKeyFn = (
   },
   queryKey?: Array<unknown>,
 ) => [useAssetServiceNextRunAssetsKey, ...(queryKey ?? [{ dagId }])];
+export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
+  ReturnType<typeof DashboardService.historicalMetrics>
+>;
+export type DashboardServiceHistoricalMetricsQueryResult<
+  TData = DashboardServiceHistoricalMetricsDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDashboardServiceHistoricalMetricsKey =
+  "DashboardServiceHistoricalMetrics";
+export const UseDashboardServiceHistoricalMetricsKeyFn = (
+  {
+    endDate,
+    startDate,
+  }: {
+    endDate: string;
+    startDate: string;
+  },
+  queryKey?: Array<unknown>,
+) => [
+  useDashboardServiceHistoricalMetricsKey,
+  ...(queryKey ?? [{ endDate, startDate }]),
+];
 export type DagServiceGetDagsDefaultResponse = Awaited<
   ReturnType<typeof DagService.getDags>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index cbb43cca3a..7c8555b29d 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -5,6 +5,7 @@ import {
   AssetService,
   ConnectionService,
   DagService,
+  DashboardService,
 } from "../requests/services.gen";
 import { DagRunState } from "../requests/types.gen";
 import * as Common from "./common";
@@ -28,6 +29,32 @@ export const prefetchUseAssetServiceNextRunAssets = (
     queryKey: Common.UseAssetServiceNextRunAssetsKeyFn({ dagId }),
     queryFn: () => AssetService.nextRunAssets({ dagId }),
   });
+/**
+ * Historical Metrics
+ * Return cluster activity historical metrics.
+ * @param data The data for the request.
+ * @param data.startDate
+ * @param data.endDate
+ * @returns HistoricalMetricDataResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDashboardServiceHistoricalMetrics = (
+  queryClient: QueryClient,
+  {
+    endDate,
+    startDate,
+  }: {
+    endDate: string;
+    startDate: string;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseDashboardServiceHistoricalMetricsKeyFn({
+      endDate,
+      startDate,
+    }),
+    queryFn: () => DashboardService.historicalMetrics({ endDate, startDate }),
+  });
 /**
  * Get Dags
  * Get all DAGs.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 22d58eadda..9137ea4ed0 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -1,15 +1,16 @@
 // generated with @7nohe/[email protected]
 import {
-  useMutation,
   UseMutationOptions,
-  useQuery,
   UseQueryOptions,
+  useMutation,
+  useQuery,
 } from "@tanstack/react-query";
 
 import {
   AssetService,
   ConnectionService,
   DagService,
+  DashboardService,
   VariableService,
 } from "../requests/services.gen";
 import { DAGPatchBody, DagRunState } from "../requests/types.gen";
@@ -40,6 +41,39 @@ export const useAssetServiceNextRunAssets = <
     queryFn: () => AssetService.nextRunAssets({ dagId }) as TData,
     ...options,
   });
+/**
+ * Historical Metrics
+ * Return cluster activity historical metrics.
+ * @param data The data for the request.
+ * @param data.startDate
+ * @param data.endDate
+ * @returns HistoricalMetricDataResponse Successful Response
+ * @throws ApiError
+ */
+export const useDashboardServiceHistoricalMetrics = <
+  TData = Common.DashboardServiceHistoricalMetricsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    endDate,
+    startDate,
+  }: {
+    endDate: string;
+    startDate: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseDashboardServiceHistoricalMetricsKeyFn(
+      { endDate, startDate },
+      queryKey,
+    ),
+    queryFn: () =>
+      DashboardService.historicalMetrics({ endDate, startDate }) as TData,
+    ...options,
+  });
 /**
  * Get Dags
  * Get all DAGs.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 04d7eb94b3..70be4beb0d 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -5,6 +5,7 @@ import {
   AssetService,
   ConnectionService,
   DagService,
+  DashboardService,
 } from "../requests/services.gen";
 import { DagRunState } from "../requests/types.gen";
 import * as Common from "./common";
@@ -34,6 +35,39 @@ export const useAssetServiceNextRunAssetsSuspense = <
     queryFn: () => AssetService.nextRunAssets({ dagId }) as TData,
     ...options,
   });
+/**
+ * Historical Metrics
+ * Return cluster activity historical metrics.
+ * @param data The data for the request.
+ * @param data.startDate
+ * @param data.endDate
+ * @returns HistoricalMetricDataResponse Successful Response
+ * @throws ApiError
+ */
+export const useDashboardServiceHistoricalMetricsSuspense = <
+  TData = Common.DashboardServiceHistoricalMetricsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    endDate,
+    startDate,
+  }: {
+    endDate: string;
+    startDate: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseDashboardServiceHistoricalMetricsKeyFn(
+      { endDate, startDate },
+      queryKey,
+    ),
+    queryFn: () =>
+      DashboardService.historicalMetrics({ endDate, startDate }) as TData,
+    ...options,
+  });
 /**
  * Get Dags
  * Get all DAGs.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 910354423b..26fe1180ae 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -784,6 +784,56 @@ export const $DAGResponse = {
   description: "DAG serializer for responses.",
 } as const;
 
+export const $DAGRunStates = {
+  properties: {
+    queued: {
+      type: "integer",
+      title: "Queued",
+    },
+    running: {
+      type: "integer",
+      title: "Running",
+    },
+    success: {
+      type: "integer",
+      title: "Success",
+    },
+    failed: {
+      type: "integer",
+      title: "Failed",
+    },
+  },
+  type: "object",
+  required: ["queued", "running", "success", "failed"],
+  title: "DAGRunStates",
+  description: "DAG Run States for responses.",
+} as const;
+
+export const $DAGRunTypes = {
+  properties: {
+    backfill: {
+      type: "integer",
+      title: "Backfill",
+    },
+    scheduled: {
+      type: "integer",
+      title: "Scheduled",
+    },
+    manual: {
+      type: "integer",
+      title: "Manual",
+    },
+    dataset_triggered: {
+      type: "integer",
+      title: "Dataset Triggered",
+    },
+  },
+  type: "object",
+  required: ["backfill", "scheduled", "manual", "dataset_triggered"],
+  title: "DAGRunTypes",
+  description: "DAG Run Types for responses.",
+} as const;
+
 export const $DagRunState = {
   type: "string",
   enum: ["queued", "running", "success", "failed"],
@@ -847,6 +897,99 @@ export const $HTTPValidationError = {
   title: "HTTPValidationError",
 } as const;
 
+export const $HistoricalMetricDataResponse = {
+  properties: {
+    dag_run_types: {
+      $ref: "#/components/schemas/DAGRunTypes",
+    },
+    dag_run_states: {
+      $ref: "#/components/schemas/DAGRunStates",
+    },
+    task_instance_states: {
+      $ref: "#/components/schemas/TaskInstantState",
+    },
+  },
+  type: "object",
+  required: ["dag_run_types", "dag_run_states", "task_instance_states"],
+  title: "HistoricalMetricDataResponse",
+  description: "Historical Metric Data serializer for responses.",
+} as const;
+
+export const $TaskInstantState = {
+  properties: {
+    no_status: {
+      type: "integer",
+      title: "No Status",
+    },
+    removed: {
+      type: "integer",
+      title: "Removed",
+    },
+    scheduled: {
+      type: "integer",
+      title: "Scheduled",
+    },
+    queued: {
+      type: "integer",
+      title: "Queued",
+    },
+    running: {
+      type: "integer",
+      title: "Running",
+    },
+    success: {
+      type: "integer",
+      title: "Success",
+    },
+    restarting: {
+      type: "integer",
+      title: "Restarting",
+    },
+    failed: {
+      type: "integer",
+      title: "Failed",
+    },
+    up_for_retry: {
+      type: "integer",
+      title: "Up For Retry",
+    },
+    up_for_reschedule: {
+      type: "integer",
+      title: "Up For Reschedule",
+    },
+    upstream_failed: {
+      type: "integer",
+      title: "Upstream Failed",
+    },
+    skipped: {
+      type: "integer",
+      title: "Skipped",
+    },
+    deferred: {
+      type: "integer",
+      title: "Deferred",
+    },
+  },
+  type: "object",
+  required: [
+    "no_status",
+    "removed",
+    "scheduled",
+    "queued",
+    "running",
+    "success",
+    "restarting",
+    "failed",
+    "up_for_retry",
+    "up_for_reschedule",
+    "upstream_failed",
+    "skipped",
+    "deferred",
+  ],
+  title: "TaskInstantState",
+  description: "TaskInstance serializer for responses.",
+} as const;
+
 export const $ValidationError = {
   properties: {
     loc: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 72fd2f68f1..268f636404 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -5,6 +5,8 @@ import { request as __request } from "./core/request";
 import type {
   NextRunAssetsData,
   NextRunAssetsResponse,
+  HistoricalMetricsData,
+  HistoricalMetricsResponse,
   GetDagsData,
   GetDagsResponse,
   PatchDagsData,
@@ -45,6 +47,34 @@ export class AssetService {
   }
 }
 
+export class DashboardService {
+  /**
+   * Historical Metrics
+   * Return cluster activity historical metrics.
+   * @param data The data for the request.
+   * @param data.startDate
+   * @param data.endDate
+   * @returns HistoricalMetricDataResponse Successful Response
+   * @throws ApiError
+   */
+  public static historicalMetrics(
+    data: HistoricalMetricsData,
+  ): CancelablePromise<HistoricalMetricsResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: "/ui/dashboard/historical_metrics_data",
+      query: {
+        start_date: data.startDate,
+        end_date: data.endDate,
+      },
+      errors: {
+        400: "Bad Request",
+        422: "Validation Error",
+      },
+    });
+  }
+}
+
 export class DagService {
   /**
    * Get Dags
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index d07d980397..268960a596 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -120,6 +120,26 @@ export type DAGResponse = {
   readonly file_token: string;
 };
 
+/**
+ * DAG Run States for responses.
+ */
+export type DAGRunStates = {
+  queued: number;
+  running: number;
+  success: number;
+  failed: number;
+};
+
+/**
+ * DAG Run Types for responses.
+ */
+export type DAGRunTypes = {
+  backfill: number;
+  scheduled: number;
+  manual: number;
+  dataset_triggered: number;
+};
+
 /**
  * All possible states that a DagRun can be in.
  *
@@ -152,6 +172,34 @@ export type HTTPValidationError = {
   detail?: Array<ValidationError>;
 };
 
+/**
+ * Historical Metric Data serializer for responses.
+ */
+export type HistoricalMetricDataResponse = {
+  dag_run_types: DAGRunTypes;
+  dag_run_states: DAGRunStates;
+  task_instance_states: TaskInstantState;
+};
+
+/**
+ * TaskInstance serializer for responses.
+ */
+export type TaskInstantState = {
+  no_status: number;
+  removed: number;
+  scheduled: number;
+  queued: number;
+  running: number;
+  success: number;
+  restarting: number;
+  failed: number;
+  up_for_retry: number;
+  up_for_reschedule: number;
+  upstream_failed: number;
+  skipped: number;
+  deferred: number;
+};
+
 export type ValidationError = {
   loc: Array<string | number>;
   msg: string;
@@ -166,6 +214,13 @@ export type NextRunAssetsResponse = {
   [key: string]: unknown;
 };
 
+export type HistoricalMetricsData = {
+  endDate: string;
+  startDate: string;
+};
+
+export type HistoricalMetricsResponse = HistoricalMetricDataResponse;
+
 export type GetDagsData = {
   dagDisplayNamePattern?: string | null;
   dagIdPattern?: string | null;
@@ -246,6 +301,25 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/ui/dashboard/historical_metrics_data": {
+    get: {
+      req: HistoricalMetricsData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: HistoricalMetricDataResponse;
+        /**
+         * Bad Request
+         */
+        400: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/dags/": {
     get: {
       req: GetDagsData;
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 7782da955c..8dba4c4fcc 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3348,6 +3348,7 @@ class Airflow(AirflowBaseView):
 
     @expose("/object/historical_metrics_data")
     @auth.has_access_view(AccessView.CLUSTER_ACTIVITY)
+    @mark_fastapi_migration_done
     def historical_metrics_data(self):
         """Return cluster activity historical metrics."""
         start_date = _safe_parse_datetime(request.args.get("start_date"))
diff --git a/tests/api_fastapi/views/ui/test_dashboard.py 
b/tests/api_fastapi/views/ui/test_dashboard.py
new file mode 100644
index 0000000000..970b79ad35
--- /dev/null
+++ b/tests/api_fastapi/views/ui/test_dashboard.py
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+
+import pendulum
+import pytest
+
+from airflow.models import DagBag
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.state import DagRunState, TaskInstanceState
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+
[email protected](autouse=True, scope="module")
+def examples_dag_bag():
+    # Speed up: We don't want example dags for this module
+
+    return DagBag(include_examples=False, read_dags_from_db=True)
+
+
[email protected](autouse=True)
+def clean():
+    clear_db_runs()
+    yield
+    clear_db_runs()
+
+
+# freeze time fixture so that it is applied before `make_dag_runs` is!
[email protected]
+def freeze_time_for_dagruns(time_machine):
+    time_machine.move_to("2023-05-02T00:00:00+00:00", tick=False)
+
+
[email protected]
+def make_dag_runs(dag_maker, session, time_machine):
+    with dag_maker(
+        dag_id="test_dag_id",
+        serialized=True,
+        session=session,
+        start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+    ):
+        EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
+
+    date = dag_maker.dag.start_date
+
+    run1 = dag_maker.create_dagrun(
+        run_id="run_1",
+        state=DagRunState.SUCCESS,
+        run_type=DagRunType.SCHEDULED,
+        execution_date=date,
+        start_date=date,
+    )
+
+    run2 = dag_maker.create_dagrun(
+        run_id="run_2",
+        state=DagRunState.FAILED,
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=date + timedelta(days=1),
+        start_date=date + timedelta(days=1),
+    )
+
+    run3 = dag_maker.create_dagrun(
+        run_id="run_3",
+        state=DagRunState.RUNNING,
+        run_type=DagRunType.SCHEDULED,
+        execution_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0, 
tzinfo=pendulum.UTC),
+        start_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0, tzinfo=pendulum.UTC),
+    )
+    run3.end_date = None
+
+    for ti in run1.task_instances:
+        ti.state = TaskInstanceState.SUCCESS
+
+    for ti in run2.task_instances:
+        ti.state = TaskInstanceState.FAILED
+
+    dag_maker.dagbag.sync_to_db()
+    time_machine.move_to("2023-07-02T00:00:00+00:00", tick=False)
+
+
+class TestHistoricalMetricsDataEndpoint:
+    @pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
+    def test_historical_metrics_data(self, test_client, time_machine):
+        params = {"start_date": "2023-01-01T00:00", "end_date": 
"2023-08-02T00:00"}
+        response = test_client.get("/ui/dashboard/historical_metrics_data", 
params=params)
+
+        assert response.status_code == 200
+        assert response.json() == {
+            "dag_run_states": {"failed": 1, "queued": 0, "running": 1, 
"success": 1},
+            "dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual": 
0, "scheduled": 2},
+            "task_instance_states": {
+                "deferred": 0,
+                "failed": 2,
+                "no_status": 2,
+                "queued": 0,
+                "removed": 0,
+                "restarting": 0,
+                "running": 0,
+                "scheduled": 0,
+                "skipped": 0,
+                "success": 2,
+                "up_for_reschedule": 0,
+                "up_for_retry": 0,
+                "upstream_failed": 0,
+            },
+        }
+
+    @pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
+    def test_historical_metrics_data_date_filters(self, test_client):
+        params = {"start_date": "2023-02-02T00:00", "end_date": 
"2023-06-02T00:00"}
+        response = test_client.get("/ui/dashboard/historical_metrics_data", 
params=params)
+        assert response.status_code == 200
+        assert response.json() == {
+            "dag_run_states": {"failed": 1, "queued": 0, "running": 0, 
"success": 0},
+            "dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual": 
0, "scheduled": 0},
+            "task_instance_states": {
+                "deferred": 0,
+                "failed": 2,
+                "no_status": 0,
+                "queued": 0,
+                "removed": 0,
+                "restarting": 0,
+                "running": 0,
+                "scheduled": 0,
+                "skipped": 0,
+                "success": 0,
+                "up_for_reschedule": 0,
+                "up_for_retry": 0,
+                "upstream_failed": 0,
+            },
+        }


Reply via email to