This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7751594c31 feat: add dag_stats rest api endpoint (#41017)
7751594c31 is described below
commit 7751594c31aca65ee5a55b5f533cdec9070661e0
Author: Sebastian Daum <[email protected]>
AuthorDate: Thu Aug 1 22:10:26 2024 +0300
feat: add dag_stats rest api endpoint (#41017)
---
.../api_connexion/endpoints/dag_stats_endpoint.py | 62 +++++++
airflow/api_connexion/openapi/v1.yaml | 66 ++++++++
airflow/api_connexion/schemas/dag_stats_schema.py | 47 ++++++
airflow/www/static/js/types/api-generated.ts | 50 ++++++
.../endpoints/test_dag_stats_endpoint.py | 186 +++++++++++++++++++++
.../api_connexion/schemas/test_dag_stats_schema.py | 87 ++++++++++
6 files changed, 498 insertions(+)
diff --git a/airflow/api_connexion/endpoints/dag_stats_endpoint.py
b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
new file mode 100644
index 0000000000..705af10d41
--- /dev/null
+++ b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from flask import g
+from sqlalchemy import func, select
+
+from airflow.api_connexion import security
+from airflow.api_connexion.schemas.dag_stats_schema import (
+ dag_stats_collection_schema,
+)
+from airflow.auth.managers.models.resource_details import DagAccessEntity
+from airflow.models.dag import DagRun
+from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.state import DagRunState
+from airflow.www.extensions.init_auth_manager import get_auth_manager
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+ from airflow.api_connexion.types import APIResponse
+
+
[email protected]_access_dag("GET", DagAccessEntity.RUN)
+@provide_session
+def get_dag_stats(*, dag_ids: str, session: Session = NEW_SESSION) ->
APIResponse:
+ """Get Dag statistics."""
+ allowed_dag_ids =
get_auth_manager().get_permitted_dag_ids(methods=["GET"], user=g.user)
+ dags_list = set(dag_ids.split(","))
+ filter_dag_ids = dags_list.intersection(allowed_dag_ids)
+
+ query = (
+ select(DagRun.dag_id, DagRun.state, func.count(DagRun.state))
+ .group_by(DagRun.dag_id, DagRun.state)
+ .where(DagRun.dag_id.in_(filter_dag_ids))
+ )
+ dag_state_stats = session.execute(query)
+
+ dag_state_data = {(dag_id, state): count for dag_id, state, count in
dag_state_stats}
+ dag_stats = {
+ dag_id: [{"state": state, "count": dag_state_data.get((dag_id, state),
0)} for state in DagRunState]
+ for dag_id in filter_dag_ids
+ }
+
+ dags = [{"dag_id": stat, "stats": dag_stats[stat]} for stat in dag_stats]
+ return dag_stats_collection_schema.dump({"dags": dags, "total_entries":
len(dag_stats)})
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index 9a2d66eb5c..0394da4f46 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2225,6 +2225,32 @@ paths:
"404":
$ref: "#/components/responses/NotFound"
+ /dagStats:
+ get:
+ summary: List Dag statistics
+ x-openapi-router-controller:
airflow.api_connexion.endpoints.dag_stats_endpoint
+ operationId: get_dag_stats
+ tags: [DagStats]
+ parameters:
+ - name: dag_ids
+ in: query
+ schema:
+ type: string
+ required: true
+ description: |
+ One or more DAG IDs separated by commas to filter relevant Dags.
+ responses:
+ "200":
+ description: Success.
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/DagStatsCollectionSchema"
+ "401":
+ $ref: "#/components/responses/Unauthenticated"
+ "403":
+ $ref: "#/components/responses/PermissionDenied"
+
/dagSources/{file_token}:
parameters:
- $ref: "#/components/parameters/FileToken"
@@ -3424,6 +3450,46 @@ components:
$ref: "#/components/schemas/DAGRun"
- $ref: "#/components/schemas/CollectionInfo"
+ DagStatsCollectionSchema:
+ type: object
+ description: |
+ Collection of Dag statistics.
+
+ allOf:
+ - type: object
+ properties:
+ dags:
+ type: array
+ items:
+ $ref: "#/components/schemas/DagStatsCollectionItem"
+ - $ref: "#/components/schemas/CollectionInfo"
+
+ DagStatsCollectionItem:
+ description: DagStats entry collection item.
+
+ type: object
+ properties:
+ dag_id:
+ type: string
+ description: The DAG ID.
+ stats:
+ type: array
+ nullable: true
+ items:
+ $ref: "#/components/schemas/DagStatsStateCollectionItem"
+
+ DagStatsStateCollectionItem:
+ description: DagStatsState entry collection item.
+
+ type: object
+ properties:
+ state:
+ type: string
+ description: The DAG state.
+ count:
+ type: integer
+ description: The DAG state count.
+
DagWarning:
type: object
properties:
diff --git a/airflow/api_connexion/schemas/dag_stats_schema.py
b/airflow/api_connexion/schemas/dag_stats_schema.py
new file mode 100644
index 0000000000..7fdcc29054
--- /dev/null
+++ b/airflow/api_connexion/schemas/dag_stats_schema.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from marshmallow import Schema, fields
+
+from airflow.api_connexion.schemas.enum_schemas import DagStateField
+
+
+class DagStatsStateSchema(Schema):
+ """DagStatsState Schema."""
+
+ state = DagStateField(dump_only=True)
+ count = fields.Int(dump_only=True)
+
+
+class DagStatsSchema(Schema):
+ """DagStats Schema."""
+
+ dag_id = fields.String(required=True)
+ stats = fields.List(fields.Nested(DagStatsStateSchema))
+
+
+class DagStatsCollectionSchema(Schema):
+ """DagStassCollection Schema."""
+
+ dags = fields.List(fields.Nested(DagStatsSchema))
+ total_entries = fields.Int()
+
+
+dag_stats_state_schema = DagStatsStateSchema()
+dag_stats_schema = DagStatsSchema()
+dag_stats_collection_schema = DagStatsCollectionSchema()
diff --git a/airflow/www/static/js/types/api-generated.ts
b/airflow/www/static/js/types/api-generated.ts
index 6cda5f6265..a892e327ac 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -737,6 +737,9 @@ export interface paths {
};
};
};
+ "/dagStats": {
+ get: operations["get_dag_stats"];
+ };
"/dagSources/{file_token}": {
/** Get a source code using file token. */
get: operations["get_dag_source"];
@@ -1264,6 +1267,23 @@ export interface components {
DAGRunCollection: {
dag_runs?: components["schemas"]["DAGRun"][];
} & components["schemas"]["CollectionInfo"];
+ /** @description Collection of Dag statistics. */
+ DagStatsCollectionSchema: {
+ dags?: components["schemas"]["DagStatsCollectionItem"][];
+ } & components["schemas"]["CollectionInfo"];
+ /** @description DagStats entry collection item. */
+ DagStatsCollectionItem: {
+ /** @description The DAG ID. */
+ dag_id?: string;
+ stats?: components["schemas"]["DagStatsStateCollectionItem"][] | null;
+ };
+ /** @description DagStatsState entry collection item. */
+ DagStatsStateCollectionItem: {
+ /** @description The DAG state. */
+ state?: string;
+ /** @description The DAG state count. */
+ count?: number;
+ };
DagWarning: {
/** @description The dag_id. */
dag_id?: string;
@@ -4820,6 +4840,24 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
+ get_dag_stats: {
+ parameters: {
+ query: {
+ /** One or more DAG IDs separated by commas to filter relevant Dags. */
+ dag_ids: string;
+ };
+ };
+ responses: {
+ /** Success. */
+ 200: {
+ content: {
+ "application/json":
components["schemas"]["DagStatsCollectionSchema"];
+ };
+ };
+ 401: components["responses"]["Unauthenticated"];
+ 403: components["responses"]["PermissionDenied"];
+ };
+ };
/** Get a source code using file token. */
get_dag_source: {
parameters: {
@@ -5433,6 +5471,15 @@ export type UpdateDagRunState = CamelCasedPropertiesDeep<
export type DAGRunCollection = CamelCasedPropertiesDeep<
components["schemas"]["DAGRunCollection"]
>;
+export type DagStatsCollectionSchema = CamelCasedPropertiesDeep<
+ components["schemas"]["DagStatsCollectionSchema"]
+>;
+export type DagStatsCollectionItem = CamelCasedPropertiesDeep<
+ components["schemas"]["DagStatsCollectionItem"]
+>;
+export type DagStatsStateCollectionItem = CamelCasedPropertiesDeep<
+ components["schemas"]["DagStatsStateCollectionItem"]
+>;
export type DagWarning = CamelCasedPropertiesDeep<
components["schemas"]["DagWarning"]
>;
@@ -5894,6 +5941,9 @@ export type GetTasksVariables = CamelCasedPropertiesDeep<
export type GetTaskVariables = CamelCasedPropertiesDeep<
operations["get_task"]["parameters"]["path"]
>;
+export type GetDagStatsVariables = CamelCasedPropertiesDeep<
+ operations["get_dag_stats"]["parameters"]["query"]
+>;
export type GetDagSourceVariables = CamelCasedPropertiesDeep<
operations["get_dag_source"]["parameters"]["path"]
>;
diff --git a/tests/api_connexion/endpoints/test_dag_stats_endpoint.py
b/tests/api_connexion/endpoints/test_dag_stats_endpoint.py
new file mode 100644
index 0000000000..631369bff4
--- /dev/null
+++ b/tests/api_connexion/endpoints/test_dag_stats_endpoint.py
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+
+import pytest
+
+from airflow.models.dag import DAG, DagModel
+from airflow.models.dagrun import DagRun
+from airflow.security import permissions
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunType
+from tests.test_utils.api_connexion_utils import create_user, delete_user
+from tests.test_utils.db import clear_db_dags, clear_db_runs,
clear_db_serialized_dags
+
+pytestmark = pytest.mark.db_test
+
+
[email protected](scope="module")
+def configured_app(minimal_app_for_api):
+ app = minimal_app_for_api
+
+ create_user(
+ app, # type: ignore
+ username="test",
+ role_name="Test",
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ ],
+ )
+
+ create_user(app, username="test_no_permissions",
role_name="TestNoPermissions") # type: ignore
+
+ yield app
+
+ delete_user(app, username="test") # type: ignore
+ delete_user(app, username="test_no_permissions") # type: ignore
+
+
+class TestDagStatsEndpoint:
+ default_time = "2020-06-11T18:00:00+00:00"
+
+ @pytest.fixture(autouse=True)
+ def setup_attrs(self, configured_app) -> None:
+ self.app = configured_app
+ self.client = self.app.test_client() # type:ignore
+ clear_db_runs()
+ clear_db_serialized_dags()
+ clear_db_dags()
+
+ def teardown_method(self) -> None:
+ clear_db_runs()
+ clear_db_dags()
+ clear_db_serialized_dags()
+
+ def _create_dag(self, dag_id):
+ dag_instance = DagModel(dag_id=dag_id)
+ dag_instance.is_active = True
+ with create_session() as session:
+ session.add(dag_instance)
+ dag = DAG(dag_id=dag_id, schedule=None)
+ self.app.dag_bag.bag_dag(dag, root_dag=dag)
+ return dag_instance
+
+ def test_should_respond_200(self, session):
+ self._create_dag("dag_stats_dag")
+ self._create_dag("dag_stats_dag_2")
+ dag_1_run_1 = DagRun(
+ dag_id="dag_stats_dag",
+ run_id="test_dag_run_id_1",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state="running",
+ )
+ dag_1_run_2 = DagRun(
+ dag_id="dag_stats_dag",
+ run_id="test_dag_run_id_2",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(self.default_time) +
timedelta(days=1),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state="failed",
+ )
+ dag_2_run_1 = DagRun(
+ dag_id="dag_stats_dag_2",
+ run_id="test_dag_2_run_id_1",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state="queued",
+ )
+ session.add_all((dag_1_run_1, dag_1_run_2, dag_2_run_1))
+ session.commit()
+ exp_payload = {
+ "dags": [
+ {
+ "dag_id": "dag_stats_dag",
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 1,
+ },
+ ],
+ },
+ {
+ "dag_id": "dag_stats_dag_2",
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 1,
+ },
+ {
+ "state": DagRunState.RUNNING,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 0,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 0,
+ },
+ ],
+ },
+ ],
+ "total_entries": 2,
+ }
+
+ dag_ids = "dag_stats_dag,dag_stats_dag_2"
+ response = self.client.get(
+ f"api/v1/dagStats?dag_ids={dag_ids}",
environ_overrides={"REMOTE_USER": "test"}
+ )
+ assert response.status_code == 200
+ assert len(response.json["dags"]) == 2
+ assert sorted(response.json["dags"], key=lambda d: d["dag_id"]) ==
sorted(
+ exp_payload["dags"], key=lambda d: d["dag_id"]
+ )
+ response.json["total_entries"] == 2
+
+ def test_should_raises_401_unauthenticated(self):
+ dag_ids = "dag_stats_dag,dag_stats_dag_2"
+ response = self.client.get(
+ f"api/v1/dagStats?dag_ids={dag_ids}",
environ_overrides={"REMOTE_USER": "no_user"}
+ )
+ assert response.status_code == 401
+
+ def test_should_raises_403_no_permission(self):
+ dag_ids = "dag_stats_dag,dag_stats_dag_2"
+ response = self.client.get(
+ f"api/v1/dagStats?dag_ids={dag_ids}",
environ_overrides={"REMOTE_USER": "test_no_permissions"}
+ )
+ assert response.status_code == 403
diff --git a/tests/api_connexion/schemas/test_dag_stats_schema.py
b/tests/api_connexion/schemas/test_dag_stats_schema.py
new file mode 100644
index 0000000000..8899b3fb12
--- /dev/null
+++ b/tests/api_connexion/schemas/test_dag_stats_schema.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.api_connexion.schemas.dag_stats_schema import (
+ dag_stats_collection_schema,
+ dag_stats_schema,
+ dag_stats_state_schema,
+)
+from airflow.utils.state import DagRunState
+
+
+class TestDagStatsStateSchema:
+ def test_dag_stats_state_schema(self):
+ payload = {
+ "state": DagRunState.RUNNING,
+ "count": 2,
+ }
+ serialized_data = dag_stats_state_schema.dump(payload)
+ assert serialized_data == payload
+
+
+class TestDagStatsSchema:
+ def test_dag_stats_schema(self):
+ payload = {
+ "dag_id": "test_dag_id",
+ "stats": [
+ {
+ "state": DagRunState.QUEUED,
+ "count": 2,
+ },
+ {
+ "state": DagRunState.FAILED,
+ "count": 1,
+ },
+ ],
+ }
+ serialized_data = dag_stats_schema.dump(payload)
+ assert serialized_data == payload
+
+
+class TestDagStatsCollectionSchema:
+ def test_dag_stats_collection_schema(self):
+ payload = {
+ "dags": [
+ {
+ "dag_id": "test_dag_id",
+ "stats": [
+ {
+ "state": DagRunState.RUNNING,
+ "count": 2,
+ },
+ {
+ "state": DagRunState.SUCCESS,
+ "count": 1,
+ },
+ ],
+ },
+ {
+ "dag_id": "test_dag_id_2",
+ "stats": [
+ {
+ "state": DagRunState.RUNNING,
+ "count": 2,
+ }
+ ],
+ },
+ {"dag_id": "test_dag_id_3", "stats": []},
+ ],
+ "total_entries": 3,
+ }
+ serialized_data = dag_stats_collection_schema.dump(payload)
+ assert serialized_data == payload