This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7751594c31 feat: add dag_stats rest api endpoint (#41017)
7751594c31 is described below

commit 7751594c31aca65ee5a55b5f533cdec9070661e0
Author: Sebastian Daum <[email protected]>
AuthorDate: Thu Aug 1 22:10:26 2024 +0300

    feat: add dag_stats rest api endpoint (#41017)
---
 .../api_connexion/endpoints/dag_stats_endpoint.py  |  62 +++++++
 airflow/api_connexion/openapi/v1.yaml              |  66 ++++++++
 airflow/api_connexion/schemas/dag_stats_schema.py  |  47 ++++++
 airflow/www/static/js/types/api-generated.ts       |  50 ++++++
 .../endpoints/test_dag_stats_endpoint.py           | 186 +++++++++++++++++++++
 .../api_connexion/schemas/test_dag_stats_schema.py |  87 ++++++++++
 6 files changed, 498 insertions(+)

diff --git a/airflow/api_connexion/endpoints/dag_stats_endpoint.py 
b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
new file mode 100644
index 0000000000..705af10d41
--- /dev/null
+++ b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from flask import g
+from sqlalchemy import func, select
+
+from airflow.api_connexion import security
+from airflow.api_connexion.schemas.dag_stats_schema import (
+    dag_stats_collection_schema,
+)
+from airflow.auth.managers.models.resource_details import DagAccessEntity
+from airflow.models.dag import DagRun
+from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.state import DagRunState
+from airflow.www.extensions.init_auth_manager import get_auth_manager
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+    from airflow.api_connexion.types import APIResponse
+
+
[email protected]_access_dag("GET", DagAccessEntity.RUN)
+@provide_session
+def get_dag_stats(*, dag_ids: str, session: Session = NEW_SESSION) -> 
APIResponse:
+    """Get Dag statistics."""
+    allowed_dag_ids = 
get_auth_manager().get_permitted_dag_ids(methods=["GET"], user=g.user)
+    dags_list = set(dag_ids.split(","))
+    filter_dag_ids = dags_list.intersection(allowed_dag_ids)
+
+    query = (
+        select(DagRun.dag_id, DagRun.state, func.count(DagRun.state))
+        .group_by(DagRun.dag_id, DagRun.state)
+        .where(DagRun.dag_id.in_(filter_dag_ids))
+    )
+    dag_state_stats = session.execute(query)
+
+    dag_state_data = {(dag_id, state): count for dag_id, state, count in 
dag_state_stats}
+    dag_stats = {
+        dag_id: [{"state": state, "count": dag_state_data.get((dag_id, state), 
0)} for state in DagRunState]
+        for dag_id in filter_dag_ids
+    }
+
+    dags = [{"dag_id": stat, "stats": dag_stats[stat]} for stat in dag_stats]
+    return dag_stats_collection_schema.dump({"dags": dags, "total_entries": 
len(dag_stats)})
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index 9a2d66eb5c..0394da4f46 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2225,6 +2225,32 @@ paths:
         "404":
           $ref: "#/components/responses/NotFound"
 
+  /dagStats:
+    get:
+      summary: List Dag statistics
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dag_stats_endpoint
+      operationId: get_dag_stats
+      tags: [DagStats]
+      parameters:
+        - name: dag_ids
+          in: query
+          schema:
+            type: string
+          required: true
+          description: |
+            One or more DAG IDs separated by commas to filter relevant Dags.
+      responses:
+        "200":
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: "#/components/schemas/DagStatsCollectionSchema"
+        "401":
+          $ref: "#/components/responses/Unauthenticated"
+        "403":
+          $ref: "#/components/responses/PermissionDenied"
+
   /dagSources/{file_token}:
     parameters:
       - $ref: "#/components/parameters/FileToken"
@@ -3424,6 +3450,46 @@ components:
                 $ref: "#/components/schemas/DAGRun"
         - $ref: "#/components/schemas/CollectionInfo"
 
+    DagStatsCollectionSchema:
+      type: object
+      description: |
+        Collection of Dag statistics.
+
+      allOf:
+        - type: object
+          properties:
+            dags:
+              type: array
+              items:
+                $ref: "#/components/schemas/DagStatsCollectionItem"
+        - $ref: "#/components/schemas/CollectionInfo"
+
+    DagStatsCollectionItem:
+      description: DagStats entry collection item.
+
+      type: object
+      properties:
+        dag_id:
+          type: string
+          description: The DAG ID.
+        stats:
+          type: array
+          nullable: true
+          items:
+            $ref: "#/components/schemas/DagStatsStateCollectionItem"
+
+    DagStatsStateCollectionItem:
+      description: DagStatsState entry collection item.
+
+      type: object
+      properties:
+        state:
+          type: string
+          description: The DAG state.
+        count:
+          type: integer
+          description: The DAG state count.
+
     DagWarning:
       type: object
       properties:
diff --git a/airflow/api_connexion/schemas/dag_stats_schema.py 
b/airflow/api_connexion/schemas/dag_stats_schema.py
new file mode 100644
index 0000000000..7fdcc29054
--- /dev/null
+++ b/airflow/api_connexion/schemas/dag_stats_schema.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from marshmallow import Schema, fields
+
+from airflow.api_connexion.schemas.enum_schemas import DagStateField
+
+
+class DagStatsStateSchema(Schema):
+    """DagStatsState Schema."""
+
+    state = DagStateField(dump_only=True)
+    count = fields.Int(dump_only=True)
+
+
+class DagStatsSchema(Schema):
+    """DagStats Schema."""
+
+    dag_id = fields.String(required=True)
+    stats = fields.List(fields.Nested(DagStatsStateSchema))
+
+
+class DagStatsCollectionSchema(Schema):
+    """DagStassCollection Schema."""
+
+    dags = fields.List(fields.Nested(DagStatsSchema))
+    total_entries = fields.Int()
+
+
+dag_stats_state_schema = DagStatsStateSchema()
+dag_stats_schema = DagStatsSchema()
+dag_stats_collection_schema = DagStatsCollectionSchema()
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index 6cda5f6265..a892e327ac 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -737,6 +737,9 @@ export interface paths {
       };
     };
   };
+  "/dagStats": {
+    get: operations["get_dag_stats"];
+  };
   "/dagSources/{file_token}": {
     /** Get a source code using file token. */
     get: operations["get_dag_source"];
@@ -1264,6 +1267,23 @@ export interface components {
     DAGRunCollection: {
       dag_runs?: components["schemas"]["DAGRun"][];
     } & components["schemas"]["CollectionInfo"];
+    /** @description Collection of Dag statistics. */
+    DagStatsCollectionSchema: {
+      dags?: components["schemas"]["DagStatsCollectionItem"][];
+    } & components["schemas"]["CollectionInfo"];
+    /** @description DagStats entry collection item. */
+    DagStatsCollectionItem: {
+      /** @description The DAG ID. */
+      dag_id?: string;
+      stats?: components["schemas"]["DagStatsStateCollectionItem"][] | null;
+    };
+    /** @description DagStatsState entry collection item. */
+    DagStatsStateCollectionItem: {
+      /** @description The DAG state. */
+      state?: string;
+      /** @description The DAG state count. */
+      count?: number;
+    };
     DagWarning: {
       /** @description The dag_id. */
       dag_id?: string;
@@ -4820,6 +4840,24 @@ export interface operations {
       404: components["responses"]["NotFound"];
     };
   };
+  get_dag_stats: {
+    parameters: {
+      query: {
+        /** One or more DAG IDs separated by commas to filter relevant Dags. */
+        dag_ids: string;
+      };
+    };
+    responses: {
+      /** Success. */
+      200: {
+        content: {
+          "application/json": 
components["schemas"]["DagStatsCollectionSchema"];
+        };
+      };
+      401: components["responses"]["Unauthenticated"];
+      403: components["responses"]["PermissionDenied"];
+    };
+  };
   /** Get a source code using file token. */
   get_dag_source: {
     parameters: {
@@ -5433,6 +5471,15 @@ export type UpdateDagRunState = CamelCasedPropertiesDeep<
 export type DAGRunCollection = CamelCasedPropertiesDeep<
   components["schemas"]["DAGRunCollection"]
 >;
+export type DagStatsCollectionSchema = CamelCasedPropertiesDeep<
+  components["schemas"]["DagStatsCollectionSchema"]
+>;
+export type DagStatsCollectionItem = CamelCasedPropertiesDeep<
+  components["schemas"]["DagStatsCollectionItem"]
+>;
+export type DagStatsStateCollectionItem = CamelCasedPropertiesDeep<
+  components["schemas"]["DagStatsStateCollectionItem"]
+>;
 export type DagWarning = CamelCasedPropertiesDeep<
   components["schemas"]["DagWarning"]
 >;
@@ -5894,6 +5941,9 @@ export type GetTasksVariables = CamelCasedPropertiesDeep<
 export type GetTaskVariables = CamelCasedPropertiesDeep<
   operations["get_task"]["parameters"]["path"]
 >;
+export type GetDagStatsVariables = CamelCasedPropertiesDeep<
+  operations["get_dag_stats"]["parameters"]["query"]
+>;
 export type GetDagSourceVariables = CamelCasedPropertiesDeep<
   operations["get_dag_source"]["parameters"]["path"]
 >;
diff --git a/tests/api_connexion/endpoints/test_dag_stats_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_stats_endpoint.py
new file mode 100644
index 0000000000..631369bff4
--- /dev/null
+++ b/tests/api_connexion/endpoints/test_dag_stats_endpoint.py
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+
+import pytest
+
+from airflow.models.dag import DAG, DagModel
+from airflow.models.dagrun import DagRun
+from airflow.security import permissions
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunType
+from tests.test_utils.api_connexion_utils import create_user, delete_user
+from tests.test_utils.db import clear_db_dags, clear_db_runs, 
clear_db_serialized_dags
+
+pytestmark = pytest.mark.db_test
+
+
[email protected](scope="module")
+def configured_app(minimal_app_for_api):
+    app = minimal_app_for_api
+
+    create_user(
+        app,  # type: ignore
+        username="test",
+        role_name="Test",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        ],
+    )
+
+    create_user(app, username="test_no_permissions", 
role_name="TestNoPermissions")  # type: ignore
+
+    yield app
+
+    delete_user(app, username="test")  # type: ignore
+    delete_user(app, username="test_no_permissions")  # type: ignore
+
+
+class TestDagStatsEndpoint:
+    default_time = "2020-06-11T18:00:00+00:00"
+
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self, configured_app) -> None:
+        self.app = configured_app
+        self.client = self.app.test_client()  # type:ignore
+        clear_db_runs()
+        clear_db_serialized_dags()
+        clear_db_dags()
+
+    def teardown_method(self) -> None:
+        clear_db_runs()
+        clear_db_dags()
+        clear_db_serialized_dags()
+
+    def _create_dag(self, dag_id):
+        dag_instance = DagModel(dag_id=dag_id)
+        dag_instance.is_active = True
+        with create_session() as session:
+            session.add(dag_instance)
+        dag = DAG(dag_id=dag_id, schedule=None)
+        self.app.dag_bag.bag_dag(dag, root_dag=dag)
+        return dag_instance
+
+    def test_should_respond_200(self, session):
+        self._create_dag("dag_stats_dag")
+        self._create_dag("dag_stats_dag_2")
+        dag_1_run_1 = DagRun(
+            dag_id="dag_stats_dag",
+            run_id="test_dag_run_id_1",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state="running",
+        )
+        dag_1_run_2 = DagRun(
+            dag_id="dag_stats_dag",
+            run_id="test_dag_run_id_2",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time) + 
timedelta(days=1),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state="failed",
+        )
+        dag_2_run_1 = DagRun(
+            dag_id="dag_stats_dag_2",
+            run_id="test_dag_2_run_id_1",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state="queued",
+        )
+        session.add_all((dag_1_run_1, dag_1_run_2, dag_2_run_1))
+        session.commit()
+        exp_payload = {
+            "dags": [
+                {
+                    "dag_id": "dag_stats_dag",
+                    "stats": [
+                        {
+                            "state": DagRunState.QUEUED,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 1,
+                        },
+                        {
+                            "state": DagRunState.SUCCESS,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.FAILED,
+                            "count": 1,
+                        },
+                    ],
+                },
+                {
+                    "dag_id": "dag_stats_dag_2",
+                    "stats": [
+                        {
+                            "state": DagRunState.QUEUED,
+                            "count": 1,
+                        },
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.SUCCESS,
+                            "count": 0,
+                        },
+                        {
+                            "state": DagRunState.FAILED,
+                            "count": 0,
+                        },
+                    ],
+                },
+            ],
+            "total_entries": 2,
+        }
+
+        dag_ids = "dag_stats_dag,dag_stats_dag_2"
+        response = self.client.get(
+            f"api/v1/dagStats?dag_ids={dag_ids}", 
environ_overrides={"REMOTE_USER": "test"}
+        )
+        assert response.status_code == 200
+        assert len(response.json["dags"]) == 2
+        assert sorted(response.json["dags"], key=lambda d: d["dag_id"]) == 
sorted(
+            exp_payload["dags"], key=lambda d: d["dag_id"]
+        )
+        response.json["total_entries"] == 2
+
+    def test_should_raises_401_unauthenticated(self):
+        dag_ids = "dag_stats_dag,dag_stats_dag_2"
+        response = self.client.get(
+            f"api/v1/dagStats?dag_ids={dag_ids}", 
environ_overrides={"REMOTE_USER": "no_user"}
+        )
+        assert response.status_code == 401
+
+    def test_should_raises_403_no_permission(self):
+        dag_ids = "dag_stats_dag,dag_stats_dag_2"
+        response = self.client.get(
+            f"api/v1/dagStats?dag_ids={dag_ids}", 
environ_overrides={"REMOTE_USER": "test_no_permissions"}
+        )
+        assert response.status_code == 403
diff --git a/tests/api_connexion/schemas/test_dag_stats_schema.py 
b/tests/api_connexion/schemas/test_dag_stats_schema.py
new file mode 100644
index 0000000000..8899b3fb12
--- /dev/null
+++ b/tests/api_connexion/schemas/test_dag_stats_schema.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.api_connexion.schemas.dag_stats_schema import (
+    dag_stats_collection_schema,
+    dag_stats_schema,
+    dag_stats_state_schema,
+)
+from airflow.utils.state import DagRunState
+
+
+class TestDagStatsStateSchema:
+    def test_dag_stats_state_schema(self):
+        payload = {
+            "state": DagRunState.RUNNING,
+            "count": 2,
+        }
+        serialized_data = dag_stats_state_schema.dump(payload)
+        assert serialized_data == payload
+
+
+class TestDagStatsSchema:
+    def test_dag_stats_schema(self):
+        payload = {
+            "dag_id": "test_dag_id",
+            "stats": [
+                {
+                    "state": DagRunState.QUEUED,
+                    "count": 2,
+                },
+                {
+                    "state": DagRunState.FAILED,
+                    "count": 1,
+                },
+            ],
+        }
+        serialized_data = dag_stats_schema.dump(payload)
+        assert serialized_data == payload
+
+
+class TestDagStatsCollectionSchema:
+    def test_dag_stats_collection_schema(self):
+        payload = {
+            "dags": [
+                {
+                    "dag_id": "test_dag_id",
+                    "stats": [
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 2,
+                        },
+                        {
+                            "state": DagRunState.SUCCESS,
+                            "count": 1,
+                        },
+                    ],
+                },
+                {
+                    "dag_id": "test_dag_id_2",
+                    "stats": [
+                        {
+                            "state": DagRunState.RUNNING,
+                            "count": 2,
+                        }
+                    ],
+                },
+                {"dag_id": "test_dag_id_3", "stats": []},
+            ],
+            "total_entries": 3,
+        }
+        serialized_data = dag_stats_collection_schema.dump(payload)
+        assert serialized_data == payload

Reply via email to