This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ad54501f3fc Make datStats endpoint dag_ids parameter optional (#42955)
ad54501f3fc is described below

commit ad54501f3fccde549c7699f8ce46c711e38accc0
Author: michaeljs-c <[email protected]>
AuthorDate: Sun Oct 13 13:57:51 2024 +0100

    Make datStats endpoint dag_ids parameter optional (#42955)
    
    * Make datStats dag_id parameter optional with pagination
    
    * move pagination out of sql query
    
    * tidy
    
    ---------
    
    Co-authored-by: Michael Smith-Chandler <[email protected]>
---
 .../api_connexion/endpoints/dag_stats_endpoint.py  |  40 +++--
 airflow/api_connexion/openapi/v1.yaml              |   4 +-
 airflow/www/static/js/types/api-generated.ts       |   6 +-
 .../endpoints/test_dag_stats_endpoint.py           | 194 ++++++++++++++++++++-
 4 files changed, 227 insertions(+), 17 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_stats_endpoint.py 
b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
index 705af10d41d..3b6c6ab8e0d 100644
--- a/airflow/api_connexion/endpoints/dag_stats_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_stats_endpoint.py
@@ -39,24 +39,40 @@ if TYPE_CHECKING:
 
 @security.requires_access_dag("GET", DagAccessEntity.RUN)
 @provide_session
-def get_dag_stats(*, dag_ids: str, session: Session = NEW_SESSION) -> 
APIResponse:
+def get_dag_stats(
+    *,
+    dag_ids: str | None = None,
+    limit: int | None = None,
+    offset: int | None = None,
+    session: Session = NEW_SESSION,
+) -> APIResponse:
     """Get Dag statistics."""
     allowed_dag_ids = 
get_auth_manager().get_permitted_dag_ids(methods=["GET"], user=g.user)
-    dags_list = set(dag_ids.split(","))
-    filter_dag_ids = dags_list.intersection(allowed_dag_ids)
+    if dag_ids:
+        dags_list = set(dag_ids.split(","))
+        filter_dag_ids = dags_list.intersection(allowed_dag_ids)
+    else:
+        filter_dag_ids = allowed_dag_ids
+    query_dag_ids = sorted(list(filter_dag_ids))
+    if offset is not None:
+        query_dag_ids = query_dag_ids[offset:]
+    if limit is not None:
+        query_dag_ids = query_dag_ids[:limit]
 
     query = (
         select(DagRun.dag_id, DagRun.state, func.count(DagRun.state))
         .group_by(DagRun.dag_id, DagRun.state)
-        .where(DagRun.dag_id.in_(filter_dag_ids))
+        .where(DagRun.dag_id.in_(query_dag_ids))
     )
     dag_state_stats = session.execute(query)
-
     dag_state_data = {(dag_id, state): count for dag_id, state, count in 
dag_state_stats}
-    dag_stats = {
-        dag_id: [{"state": state, "count": dag_state_data.get((dag_id, state), 
0)} for state in DagRunState]
-        for dag_id in filter_dag_ids
-    }
-
-    dags = [{"dag_id": stat, "stats": dag_stats[stat]} for stat in dag_stats]
-    return dag_stats_collection_schema.dump({"dags": dags, "total_entries": 
len(dag_stats)})
+    dags = [
+        {
+            "dag_id": dag_id,
+            "stats": [
+                {"state": state, "count": dag_state_data.get((dag_id, state), 
0)} for state in DagRunState
+            ],
+        }
+        for dag_id in query_dag_ids
+    ]
+    return dag_stats_collection_schema.dump({"dags": dags, "total_entries": 
len(dags)})
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index b39d1cd955d..e99f91639c4 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2384,11 +2384,13 @@ paths:
       operationId: get_dag_stats
       tags: [DagStats]
       parameters:
+        - $ref: "#/components/parameters/PageLimit"
+        - $ref: "#/components/parameters/PageOffset"
         - name: dag_ids
           in: query
           schema:
             type: string
-          required: true
+          required: false
           description: |
             One or more DAG IDs separated by commas to filter relevant Dags.
       responses:
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index 15391c29424..ef45dbd3b57 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -4962,8 +4962,12 @@ export interface operations {
   get_dag_stats: {
     parameters: {
       query: {
+        /** The numbers of items to return. */
+        limit?: components["parameters"]["PageLimit"];
+        /** The number of items to skip before starting to collect the result 
set. */
+        offset?: components["parameters"]["PageOffset"];
         /** One or more DAG IDs separated by commas to filter relevant Dags. */
-        dag_ids: string;
+        dag_ids?: string;
       };
     };
     responses: {
diff --git a/tests/api_connexion/endpoints/test_dag_stats_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_stats_endpoint.py
index a447e2a6a4b..fe563b94440 100644
--- a/tests/api_connexion/endpoints/test_dag_stats_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_stats_endpoint.py
@@ -76,9 +76,10 @@ class TestDagStatsEndpoint:
         self.app.dag_bag.bag_dag(dag)
         return dag_instance
 
-    def test_should_respond_200(self, session):
+    def _create_dag_runs(self, session):
         self._create_dag("dag_stats_dag")
         self._create_dag("dag_stats_dag_2")
+        self._create_dag("dag_stats_dag_3")
         dag_1_run_1 = DagRun(
             dag_id="dag_stats_dag",
             run_id="test_dag_run_id_1",
@@ -106,8 +107,20 @@ class TestDagStatsEndpoint:
             external_trigger=True,
             state="queued",
         )
-        session.add_all((dag_1_run_1, dag_1_run_2, dag_2_run_1))
+        dag_3_run_1 = DagRun(
+            dag_id="dag_stats_dag_3",
+            run_id="test_dag_3_run_id_1",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state="success",
+        )
+        session.add_all((dag_1_run_1, dag_1_run_2, dag_2_run_1, dag_3_run_1))
         session.commit()
+
+    def test_should_respond_200(self, session):
+        self._create_dag_runs(session)
         exp_payload = {
             "dags": [
                 {
@@ -165,7 +178,182 @@ class TestDagStatsEndpoint:
         assert sorted(response.json["dags"], key=lambda d: d["dag_id"]) == 
sorted(
             exp_payload["dags"], key=lambda d: d["dag_id"]
         )
-        response.json["total_entries"] == 2
+        assert response.json["total_entries"] == 2
+
+    @pytest.mark.parametrize(
+        "url, exp_payload",
+        [
+            (
+                "api/v1/dagStats",
+                {
+                    "dags": [
+                        {
+                            "dag_id": "dag_stats_dag",
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 1,
+                                },
+                            ],
+                        },
+                        {
+                            "dag_id": "dag_stats_dag_2",
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 0,
+                                },
+                            ],
+                        },
+                        {
+                            "dag_id": "dag_stats_dag_3",
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 0,
+                                },
+                            ],
+                        },
+                    ],
+                    "total_entries": 3,
+                },
+            ),
+            (
+                "api/v1/dagStats?limit=1",
+                {
+                    "dags": [
+                        {
+                            "dag_id": "dag_stats_dag",
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 1,
+                                },
+                            ],
+                        }
+                    ],
+                    "total_entries": 1,
+                },
+            ),
+            (
+                "api/v1/dagStats?offset=2",
+                {
+                    "dags": [
+                        {
+                            "dag_id": "dag_stats_dag_3",
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 0,
+                                },
+                            ],
+                        },
+                    ],
+                    "total_entries": 1,
+                },
+            ),
+            (
+                "api/v1/dagStats?offset=1&limit=1",
+                {
+                    "dags": [
+                        {
+                            "dag_id": "dag_stats_dag_2",
+                            "stats": [
+                                {
+                                    "state": DagRunState.QUEUED,
+                                    "count": 1,
+                                },
+                                {
+                                    "state": DagRunState.RUNNING,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.SUCCESS,
+                                    "count": 0,
+                                },
+                                {
+                                    "state": DagRunState.FAILED,
+                                    "count": 0,
+                                },
+                            ],
+                        },
+                    ],
+                    "total_entries": 1,
+                },
+            ),
+            ("api/v1/dagStats?offset=10&limit=1", {"dags": [], 
"total_entries": 0}),
+        ],
+    )
+    def test_optional_dag_ids_with_limit_offset(self, url, exp_payload, 
session):
+        self._create_dag_runs(session)
+
+        response = self.client.get(url, environ_overrides={"REMOTE_USER": 
"test"})
+        num_dags = len(exp_payload["dags"])
+        assert response.status_code == 200
+        assert sorted(response.json["dags"], key=lambda d: d["dag_id"]) == 
sorted(
+            exp_payload["dags"], key=lambda d: d["dag_id"]
+        )
+        assert response.json["total_entries"] == num_dags
 
     def test_should_raises_401_unauthenticated(self):
         dag_ids = "dag_stats_dag,dag_stats_dag_2"

Reply via email to