This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66a89f1bd4 AIP-84 Migrate views /object/historical_metrics_data to
Fast API (#42629)
66a89f1bd4 is described below
commit 66a89f1bd471cc7038031109dbff3769504f1c71
Author: Bugra Ozturk <[email protected]>
AuthorDate: Wed Oct 9 10:09:40 2024 +0200
AIP-84 Migrate views /object/historical_metrics_data to Fast API (#42629)
* Include object router and migrate objects/historical_metrics_data to
FastAPI
* Use pyfixture session
* Include provide_session for SQLite session thread issue
* Make method async and remove unused session from tests
* Include return type to generate proper openapi spec
* Object definition to Dashboard definition, make safe date Annotated
parameter in parameters.py
* Include pydantic model for Dashboard Historical Metric Data and convert
response to HistoricalMetricDataResponse
* Fix conflict and rerun pre-commit hooks
* Rename data variable to historical metrics for consistency
* Include object router and migrate objects/historical_metrics_data to
FastAPI
* Object definition to Dashboard definition, make safe date Annotated
parameter in parameters.py
* Drop data from method name to prevent double Data in the name of objects
* Variable name change to prevent shadow naming with method name
* Resolve conflicts and rebase again
* Resolve conflicts and rebase
---
airflow/api_fastapi/openapi/v1-generated.yaml | 156 ++++++++++++++++++++++++
airflow/api_fastapi/parameters.py | 22 ++++
airflow/api_fastapi/serializers/dashboard.py | 63 ++++++++++
airflow/api_fastapi/views/ui/__init__.py | 2 +
airflow/api_fastapi/views/ui/dashboard.py | 100 +++++++++++++++
airflow/ui/openapi-gen/queries/common.ts | 23 ++++
airflow/ui/openapi-gen/queries/prefetch.ts | 27 ++++
airflow/ui/openapi-gen/queries/queries.ts | 38 +++++-
airflow/ui/openapi-gen/queries/suspense.ts | 34 ++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 143 ++++++++++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 30 +++++
airflow/ui/openapi-gen/requests/types.gen.ts | 74 +++++++++++
airflow/www/views.py | 1 +
tests/api_fastapi/views/ui/test_dashboard.py | 151 +++++++++++++++++++++++
14 files changed, 862 insertions(+), 2 deletions(-)
diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml
b/airflow/api_fastapi/openapi/v1-generated.yaml
index f3bc8612bf..28723b8008 100644
--- a/airflow/api_fastapi/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/openapi/v1-generated.yaml
@@ -34,6 +34,45 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /ui/dashboard/historical_metrics_data:
+ get:
+ tags:
+ - Dashboard
+ summary: Historical Metrics
+ description: Return cluster activity historical metrics.
+ operationId: historical_metrics
+ parameters:
+ - name: start_date
+ in: query
+ required: true
+ schema:
+ type: string
+ title: Start Date
+ - name: end_date
+ in: query
+ required: true
+ schema:
+ type: string
+ title: End Date
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HistoricalMetricDataResponse'
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/dags/:
get:
tags:
@@ -964,6 +1003,50 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
+ DAGRunStates:
+ properties:
+ queued:
+ type: integer
+ title: Queued
+ running:
+ type: integer
+ title: Running
+ success:
+ type: integer
+ title: Success
+ failed:
+ type: integer
+ title: Failed
+ type: object
+ required:
+ - queued
+ - running
+ - success
+ - failed
+ title: DAGRunStates
+ description: DAG Run States for responses.
+ DAGRunTypes:
+ properties:
+ backfill:
+ type: integer
+ title: Backfill
+ scheduled:
+ type: integer
+ title: Scheduled
+ manual:
+ type: integer
+ title: Manual
+ dataset_triggered:
+ type: integer
+ title: Dataset Triggered
+ type: object
+ required:
+ - backfill
+ - scheduled
+ - manual
+ - dataset_triggered
+ title: DAGRunTypes
+ description: DAG Run Types for responses.
DagRunState:
type: string
enum:
@@ -1016,6 +1099,79 @@ components:
title: Detail
type: object
title: HTTPValidationError
+ HistoricalMetricDataResponse:
+ properties:
+ dag_run_types:
+ $ref: '#/components/schemas/DAGRunTypes'
+ dag_run_states:
+ $ref: '#/components/schemas/DAGRunStates'
+ task_instance_states:
+ $ref: '#/components/schemas/TaskInstantState'
+ type: object
+ required:
+ - dag_run_types
+ - dag_run_states
+ - task_instance_states
+ title: HistoricalMetricDataResponse
+ description: Historical Metric Data serializer for responses.
+ TaskInstantState:
+ properties:
+ no_status:
+ type: integer
+ title: No Status
+ removed:
+ type: integer
+ title: Removed
+ scheduled:
+ type: integer
+ title: Scheduled
+ queued:
+ type: integer
+ title: Queued
+ running:
+ type: integer
+ title: Running
+ success:
+ type: integer
+ title: Success
+ restarting:
+ type: integer
+ title: Restarting
+ failed:
+ type: integer
+ title: Failed
+ up_for_retry:
+ type: integer
+ title: Up For Retry
+ up_for_reschedule:
+ type: integer
+ title: Up For Reschedule
+ upstream_failed:
+ type: integer
+ title: Upstream Failed
+ skipped:
+ type: integer
+ title: Skipped
+ deferred:
+ type: integer
+ title: Deferred
+ type: object
+ required:
+ - no_status
+ - removed
+ - scheduled
+ - queued
+ - running
+ - success
+ - restarting
+ - failed
+ - up_for_retry
+ - up_for_reschedule
+ - upstream_failed
+ - skipped
+ - deferred
+ title: TaskInstantState
+ description: TaskInstance serializer for responses.
ValidationError:
properties:
loc:
diff --git a/airflow/api_fastapi/parameters.py
b/airflow/api_fastapi/parameters.py
index 504014602f..59d61ad686 100644
--- a/airflow/api_fastapi/parameters.py
+++ b/airflow/api_fastapi/parameters.py
@@ -18,14 +18,18 @@
from __future__ import annotations
from abc import ABC, abstractmethod
+from datetime import datetime
from typing import TYPE_CHECKING, Any, Generic, List, TypeVar
from fastapi import Depends, HTTPException, Query
+from pendulum.parsing.exceptions import ParserError
+from pydantic import AfterValidator
from sqlalchemy import case, or_
from typing_extensions import Annotated, Self
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
+from airflow.utils import timezone
from airflow.utils.state import DagRunState
if TYPE_CHECKING:
@@ -235,6 +239,24 @@ class _LastDagRunStateFilter(BaseParam[DagRunState]):
return self.set_value(last_dag_run_state)
+def _safe_parse_datetime(date_to_check: str) -> datetime:
+ """
+ Parse datetime and raise error for invalid dates.
+
+ :param date_to_check: the string value to be parsed
+ """
+ if not date_to_check:
+ raise ValueError(f"{date_to_check} cannot be None.")
+ try:
+ return timezone.parse(date_to_check, strict=True)
+ except (TypeError, ParserError):
+ raise HTTPException(
+ 400, f"Invalid datetime: {date_to_check!r}. Please check the date
parameter have this value."
+ )
+
+
+# Common Safe DateTime
+DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
# DAG
QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter().depends)]
QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter().depends)]
diff --git a/airflow/api_fastapi/serializers/dashboard.py
b/airflow/api_fastapi/serializers/dashboard.py
new file mode 100644
index 0000000000..f5a38fa22e
--- /dev/null
+++ b/airflow/api_fastapi/serializers/dashboard.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+
+class DAGRunTypes(BaseModel):
+ """DAG Run Types for responses."""
+
+ backfill: int
+ scheduled: int
+ manual: int
+ dataset_triggered: int
+
+
+class DAGRunStates(BaseModel):
+ """DAG Run States for responses."""
+
+ queued: int
+ running: int
+ success: int
+ failed: int
+
+
+class TaskInstantState(BaseModel):
+ """TaskInstance serializer for responses."""
+
+ no_status: int
+ removed: int
+ scheduled: int
+ queued: int
+ running: int
+ success: int
+ restarting: int
+ failed: int
+ up_for_retry: int
+ up_for_reschedule: int
+ upstream_failed: int
+ skipped: int
+ deferred: int
+
+
+class HistoricalMetricDataResponse(BaseModel):
+ """Historical Metric Data serializer for responses."""
+
+ dag_run_types: DAGRunTypes
+ dag_run_states: DAGRunStates
+ task_instance_states: TaskInstantState
diff --git a/airflow/api_fastapi/views/ui/__init__.py
b/airflow/api_fastapi/views/ui/__init__.py
index 8495ac5e5e..f01686cc99 100644
--- a/airflow/api_fastapi/views/ui/__init__.py
+++ b/airflow/api_fastapi/views/ui/__init__.py
@@ -18,7 +18,9 @@ from __future__ import annotations
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.api_fastapi.views.ui.assets import assets_router
+from airflow.api_fastapi.views.ui.dashboard import dashboard_router
ui_router = AirflowRouter(prefix="/ui")
ui_router.include_router(assets_router)
+ui_router.include_router(dashboard_router)
diff --git a/airflow/api_fastapi/views/ui/dashboard.py
b/airflow/api_fastapi/views/ui/dashboard.py
new file mode 100644
index 0000000000..0d6b69a1ce
--- /dev/null
+++ b/airflow/api_fastapi/views/ui/dashboard.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from fastapi import Depends
+from sqlalchemy import func, select
+from sqlalchemy.orm import Session
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.parameters import DateTimeQuery
+from airflow.api_fastapi.serializers.dashboard import
HistoricalMetricDataResponse
+from airflow.models.dagrun import DagRun, DagRunType
+from airflow.models.taskinstance import TaskInstance
+from airflow.utils.state import DagRunState, TaskInstanceState
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+from airflow.api_fastapi.db.common import get_session
+from airflow.api_fastapi.views.router import AirflowRouter
+from airflow.utils import timezone
+
+dashboard_router = AirflowRouter(tags=["Dashboard"])
+
+
+@dashboard_router.get(
+ "/dashboard/historical_metrics_data",
+ include_in_schema=False,
+ responses=create_openapi_http_exception_doc([400]),
+)
+async def historical_metrics(
+ start_date: DateTimeQuery,
+ end_date: DateTimeQuery,
+ session: Annotated[Session, Depends(get_session)],
+) -> HistoricalMetricDataResponse:
+ """Return cluster activity historical metrics."""
+ # DagRuns
+ dag_run_types = session.execute(
+ select(DagRun.run_type, func.count(DagRun.run_id))
+ .where(
+ DagRun.start_date >= start_date,
+ func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
+ )
+ .group_by(DagRun.run_type)
+ ).all()
+
+ dag_run_states = session.execute(
+ select(DagRun.state, func.count(DagRun.run_id))
+ .where(
+ DagRun.start_date >= start_date,
+ func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
+ )
+ .group_by(DagRun.state)
+ ).all()
+
+ # TaskInstances
+ task_instance_states = session.execute(
+ select(TaskInstance.state, func.count(TaskInstance.run_id))
+ .join(TaskInstance.dag_run)
+ .where(
+ DagRun.start_date >= start_date,
+ func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
+ )
+ .group_by(TaskInstance.state)
+ ).all()
+
+ # Combining historical metrics response as dictionary
+ historical_metrics_response = {
+ "dag_run_types": {
+ **{dag_run_type.value: 0 for dag_run_type in DagRunType},
+ **dict(dag_run_types),
+ },
+ "dag_run_states": {
+ **{dag_run_state.value: 0 for dag_run_state in DagRunState},
+ **dict(dag_run_states),
+ },
+ "task_instance_states": {
+ "no_status": 0,
+ **{ti_state.value: 0 for ti_state in TaskInstanceState},
+ **{ti_state or "no_status": sum_value for ti_state, sum_value in
task_instance_states},
+ },
+ }
+
+ return
HistoricalMetricDataResponse.model_validate(historical_metrics_response,
from_attributes=True)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index afd7afd5bc..fbbbac5d60 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -5,6 +5,7 @@ import {
AssetService,
ConnectionService,
DagService,
+ DashboardService,
VariableService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
@@ -25,6 +26,28 @@ export const UseAssetServiceNextRunAssetsKeyFn = (
},
queryKey?: Array<unknown>,
) => [useAssetServiceNextRunAssetsKey, ...(queryKey ?? [{ dagId }])];
+export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
+ ReturnType<typeof DashboardService.historicalMetrics>
+>;
+export type DashboardServiceHistoricalMetricsQueryResult<
+ TData = DashboardServiceHistoricalMetricsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDashboardServiceHistoricalMetricsKey =
+ "DashboardServiceHistoricalMetrics";
+export const UseDashboardServiceHistoricalMetricsKeyFn = (
+ {
+ endDate,
+ startDate,
+ }: {
+ endDate: string;
+ startDate: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useDashboardServiceHistoricalMetricsKey,
+ ...(queryKey ?? [{ endDate, startDate }]),
+];
export type DagServiceGetDagsDefaultResponse = Awaited<
ReturnType<typeof DagService.getDags>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index cbb43cca3a..7c8555b29d 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -5,6 +5,7 @@ import {
AssetService,
ConnectionService,
DagService,
+ DashboardService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
import * as Common from "./common";
@@ -28,6 +29,32 @@ export const prefetchUseAssetServiceNextRunAssets = (
queryKey: Common.UseAssetServiceNextRunAssetsKeyFn({ dagId }),
queryFn: () => AssetService.nextRunAssets({ dagId }),
});
+/**
+ * Historical Metrics
+ * Return cluster activity historical metrics.
+ * @param data The data for the request.
+ * @param data.startDate
+ * @param data.endDate
+ * @returns HistoricalMetricDataResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDashboardServiceHistoricalMetrics = (
+ queryClient: QueryClient,
+ {
+ endDate,
+ startDate,
+ }: {
+ endDate: string;
+ startDate: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseDashboardServiceHistoricalMetricsKeyFn({
+ endDate,
+ startDate,
+ }),
+ queryFn: () => DashboardService.historicalMetrics({ endDate, startDate }),
+ });
/**
* Get Dags
* Get all DAGs.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 22d58eadda..9137ea4ed0 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -1,15 +1,16 @@
// generated with @7nohe/[email protected]
import {
- useMutation,
UseMutationOptions,
- useQuery,
UseQueryOptions,
+ useMutation,
+ useQuery,
} from "@tanstack/react-query";
import {
AssetService,
ConnectionService,
DagService,
+ DashboardService,
VariableService,
} from "../requests/services.gen";
import { DAGPatchBody, DagRunState } from "../requests/types.gen";
@@ -40,6 +41,39 @@ export const useAssetServiceNextRunAssets = <
queryFn: () => AssetService.nextRunAssets({ dagId }) as TData,
...options,
});
+/**
+ * Historical Metrics
+ * Return cluster activity historical metrics.
+ * @param data The data for the request.
+ * @param data.startDate
+ * @param data.endDate
+ * @returns HistoricalMetricDataResponse Successful Response
+ * @throws ApiError
+ */
+export const useDashboardServiceHistoricalMetrics = <
+ TData = Common.DashboardServiceHistoricalMetricsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ endDate,
+ startDate,
+ }: {
+ endDate: string;
+ startDate: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseDashboardServiceHistoricalMetricsKeyFn(
+ { endDate, startDate },
+ queryKey,
+ ),
+ queryFn: () =>
+ DashboardService.historicalMetrics({ endDate, startDate }) as TData,
+ ...options,
+ });
/**
* Get Dags
* Get all DAGs.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 04d7eb94b3..70be4beb0d 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -5,6 +5,7 @@ import {
AssetService,
ConnectionService,
DagService,
+ DashboardService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
import * as Common from "./common";
@@ -34,6 +35,39 @@ export const useAssetServiceNextRunAssetsSuspense = <
queryFn: () => AssetService.nextRunAssets({ dagId }) as TData,
...options,
});
+/**
+ * Historical Metrics
+ * Return cluster activity historical metrics.
+ * @param data The data for the request.
+ * @param data.startDate
+ * @param data.endDate
+ * @returns HistoricalMetricDataResponse Successful Response
+ * @throws ApiError
+ */
+export const useDashboardServiceHistoricalMetricsSuspense = <
+ TData = Common.DashboardServiceHistoricalMetricsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ endDate,
+ startDate,
+ }: {
+ endDate: string;
+ startDate: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseDashboardServiceHistoricalMetricsKeyFn(
+ { endDate, startDate },
+ queryKey,
+ ),
+ queryFn: () =>
+ DashboardService.historicalMetrics({ endDate, startDate }) as TData,
+ ...options,
+ });
/**
* Get Dags
* Get all DAGs.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 910354423b..26fe1180ae 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -784,6 +784,56 @@ export const $DAGResponse = {
description: "DAG serializer for responses.",
} as const;
+export const $DAGRunStates = {
+ properties: {
+ queued: {
+ type: "integer",
+ title: "Queued",
+ },
+ running: {
+ type: "integer",
+ title: "Running",
+ },
+ success: {
+ type: "integer",
+ title: "Success",
+ },
+ failed: {
+ type: "integer",
+ title: "Failed",
+ },
+ },
+ type: "object",
+ required: ["queued", "running", "success", "failed"],
+ title: "DAGRunStates",
+ description: "DAG Run States for responses.",
+} as const;
+
+export const $DAGRunTypes = {
+ properties: {
+ backfill: {
+ type: "integer",
+ title: "Backfill",
+ },
+ scheduled: {
+ type: "integer",
+ title: "Scheduled",
+ },
+ manual: {
+ type: "integer",
+ title: "Manual",
+ },
+ dataset_triggered: {
+ type: "integer",
+ title: "Dataset Triggered",
+ },
+ },
+ type: "object",
+ required: ["backfill", "scheduled", "manual", "dataset_triggered"],
+ title: "DAGRunTypes",
+ description: "DAG Run Types for responses.",
+} as const;
+
export const $DagRunState = {
type: "string",
enum: ["queued", "running", "success", "failed"],
@@ -847,6 +897,99 @@ export const $HTTPValidationError = {
title: "HTTPValidationError",
} as const;
+export const $HistoricalMetricDataResponse = {
+ properties: {
+ dag_run_types: {
+ $ref: "#/components/schemas/DAGRunTypes",
+ },
+ dag_run_states: {
+ $ref: "#/components/schemas/DAGRunStates",
+ },
+ task_instance_states: {
+ $ref: "#/components/schemas/TaskInstantState",
+ },
+ },
+ type: "object",
+ required: ["dag_run_types", "dag_run_states", "task_instance_states"],
+ title: "HistoricalMetricDataResponse",
+ description: "Historical Metric Data serializer for responses.",
+} as const;
+
+export const $TaskInstantState = {
+ properties: {
+ no_status: {
+ type: "integer",
+ title: "No Status",
+ },
+ removed: {
+ type: "integer",
+ title: "Removed",
+ },
+ scheduled: {
+ type: "integer",
+ title: "Scheduled",
+ },
+ queued: {
+ type: "integer",
+ title: "Queued",
+ },
+ running: {
+ type: "integer",
+ title: "Running",
+ },
+ success: {
+ type: "integer",
+ title: "Success",
+ },
+ restarting: {
+ type: "integer",
+ title: "Restarting",
+ },
+ failed: {
+ type: "integer",
+ title: "Failed",
+ },
+ up_for_retry: {
+ type: "integer",
+ title: "Up For Retry",
+ },
+ up_for_reschedule: {
+ type: "integer",
+ title: "Up For Reschedule",
+ },
+ upstream_failed: {
+ type: "integer",
+ title: "Upstream Failed",
+ },
+ skipped: {
+ type: "integer",
+ title: "Skipped",
+ },
+ deferred: {
+ type: "integer",
+ title: "Deferred",
+ },
+ },
+ type: "object",
+ required: [
+ "no_status",
+ "removed",
+ "scheduled",
+ "queued",
+ "running",
+ "success",
+ "restarting",
+ "failed",
+ "up_for_retry",
+ "up_for_reschedule",
+ "upstream_failed",
+ "skipped",
+ "deferred",
+ ],
+ title: "TaskInstantState",
+ description: "TaskInstance serializer for responses.",
+} as const;
+
export const $ValidationError = {
properties: {
loc: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 72fd2f68f1..268f636404 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -5,6 +5,8 @@ import { request as __request } from "./core/request";
import type {
NextRunAssetsData,
NextRunAssetsResponse,
+ HistoricalMetricsData,
+ HistoricalMetricsResponse,
GetDagsData,
GetDagsResponse,
PatchDagsData,
@@ -45,6 +47,34 @@ export class AssetService {
}
}
+export class DashboardService {
+ /**
+ * Historical Metrics
+ * Return cluster activity historical metrics.
+ * @param data The data for the request.
+ * @param data.startDate
+ * @param data.endDate
+ * @returns HistoricalMetricDataResponse Successful Response
+ * @throws ApiError
+ */
+ public static historicalMetrics(
+ data: HistoricalMetricsData,
+ ): CancelablePromise<HistoricalMetricsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/ui/dashboard/historical_metrics_data",
+ query: {
+ start_date: data.startDate,
+ end_date: data.endDate,
+ },
+ errors: {
+ 400: "Bad Request",
+ 422: "Validation Error",
+ },
+ });
+ }
+}
+
export class DagService {
/**
* Get Dags
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index d07d980397..268960a596 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -120,6 +120,26 @@ export type DAGResponse = {
readonly file_token: string;
};
+/**
+ * DAG Run States for responses.
+ */
+export type DAGRunStates = {
+ queued: number;
+ running: number;
+ success: number;
+ failed: number;
+};
+
+/**
+ * DAG Run Types for responses.
+ */
+export type DAGRunTypes = {
+ backfill: number;
+ scheduled: number;
+ manual: number;
+ dataset_triggered: number;
+};
+
/**
* All possible states that a DagRun can be in.
*
@@ -152,6 +172,34 @@ export type HTTPValidationError = {
detail?: Array<ValidationError>;
};
+/**
+ * Historical Metric Data serializer for responses.
+ */
+export type HistoricalMetricDataResponse = {
+ dag_run_types: DAGRunTypes;
+ dag_run_states: DAGRunStates;
+ task_instance_states: TaskInstantState;
+};
+
+/**
+ * TaskInstance serializer for responses.
+ */
+export type TaskInstantState = {
+ no_status: number;
+ removed: number;
+ scheduled: number;
+ queued: number;
+ running: number;
+ success: number;
+ restarting: number;
+ failed: number;
+ up_for_retry: number;
+ up_for_reschedule: number;
+ upstream_failed: number;
+ skipped: number;
+ deferred: number;
+};
+
export type ValidationError = {
loc: Array<string | number>;
msg: string;
@@ -166,6 +214,13 @@ export type NextRunAssetsResponse = {
[key: string]: unknown;
};
+export type HistoricalMetricsData = {
+ endDate: string;
+ startDate: string;
+};
+
+export type HistoricalMetricsResponse = HistoricalMetricDataResponse;
+
export type GetDagsData = {
dagDisplayNamePattern?: string | null;
dagIdPattern?: string | null;
@@ -246,6 +301,25 @@ export type $OpenApiTs = {
};
};
};
+ "/ui/dashboard/historical_metrics_data": {
+ get: {
+ req: HistoricalMetricsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: HistoricalMetricDataResponse;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/dags/": {
get: {
req: GetDagsData;
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 7782da955c..8dba4c4fcc 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3348,6 +3348,7 @@ class Airflow(AirflowBaseView):
@expose("/object/historical_metrics_data")
@auth.has_access_view(AccessView.CLUSTER_ACTIVITY)
+ @mark_fastapi_migration_done
def historical_metrics_data(self):
"""Return cluster activity historical metrics."""
start_date = _safe_parse_datetime(request.args.get("start_date"))
diff --git a/tests/api_fastapi/views/ui/test_dashboard.py
b/tests/api_fastapi/views/ui/test_dashboard.py
new file mode 100644
index 0000000000..970b79ad35
--- /dev/null
+++ b/tests/api_fastapi/views/ui/test_dashboard.py
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+
+import pendulum
+import pytest
+
+from airflow.models import DagBag
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.state import DagRunState, TaskInstanceState
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+
[email protected](autouse=True, scope="module")
+def examples_dag_bag():
+ # Speed up: We don't want example dags for this module
+
+ return DagBag(include_examples=False, read_dags_from_db=True)
+
+
[email protected](autouse=True)
+def clean():
+ clear_db_runs()
+ yield
+ clear_db_runs()
+
+
+# freeze time fixture so that it is applied before `make_dag_runs` is!
[email protected]
+def freeze_time_for_dagruns(time_machine):
+ time_machine.move_to("2023-05-02T00:00:00+00:00", tick=False)
+
+
[email protected]
+def make_dag_runs(dag_maker, session, time_machine):
+ with dag_maker(
+ dag_id="test_dag_id",
+ serialized=True,
+ session=session,
+ start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+ ):
+ EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
+
+ date = dag_maker.dag.start_date
+
+ run1 = dag_maker.create_dagrun(
+ run_id="run_1",
+ state=DagRunState.SUCCESS,
+ run_type=DagRunType.SCHEDULED,
+ execution_date=date,
+ start_date=date,
+ )
+
+ run2 = dag_maker.create_dagrun(
+ run_id="run_2",
+ state=DagRunState.FAILED,
+ run_type=DagRunType.DATASET_TRIGGERED,
+ execution_date=date + timedelta(days=1),
+ start_date=date + timedelta(days=1),
+ )
+
+ run3 = dag_maker.create_dagrun(
+ run_id="run_3",
+ state=DagRunState.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ execution_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0,
tzinfo=pendulum.UTC),
+ start_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0, tzinfo=pendulum.UTC),
+ )
+ run3.end_date = None
+
+ for ti in run1.task_instances:
+ ti.state = TaskInstanceState.SUCCESS
+
+ for ti in run2.task_instances:
+ ti.state = TaskInstanceState.FAILED
+
+ dag_maker.dagbag.sync_to_db()
+ time_machine.move_to("2023-07-02T00:00:00+00:00", tick=False)
+
+
+class TestHistoricalMetricsDataEndpoint:
+ @pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
+ def test_historical_metrics_data(self, test_client, time_machine):
+ params = {"start_date": "2023-01-01T00:00", "end_date":
"2023-08-02T00:00"}
+ response = test_client.get("/ui/dashboard/historical_metrics_data",
params=params)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_run_states": {"failed": 1, "queued": 0, "running": 1,
"success": 1},
+ "dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual":
0, "scheduled": 2},
+ "task_instance_states": {
+ "deferred": 0,
+ "failed": 2,
+ "no_status": 2,
+ "queued": 0,
+ "removed": 0,
+ "restarting": 0,
+ "running": 0,
+ "scheduled": 0,
+ "skipped": 0,
+ "success": 2,
+ "up_for_reschedule": 0,
+ "up_for_retry": 0,
+ "upstream_failed": 0,
+ },
+ }
+
+ @pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
+ def test_historical_metrics_data_date_filters(self, test_client):
+ params = {"start_date": "2023-02-02T00:00", "end_date":
"2023-06-02T00:00"}
+ response = test_client.get("/ui/dashboard/historical_metrics_data",
params=params)
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_run_states": {"failed": 1, "queued": 0, "running": 0,
"success": 0},
+ "dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual":
0, "scheduled": 0},
+ "task_instance_states": {
+ "deferred": 0,
+ "failed": 2,
+ "no_status": 0,
+ "queued": 0,
+ "removed": 0,
+ "restarting": 0,
+ "running": 0,
+ "scheduled": 0,
+ "skipped": 0,
+ "success": 0,
+ "up_for_reschedule": 0,
+ "up_for_retry": 0,
+ "upstream_failed": 0,
+ },
+ }