This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 175b960ce0d AIP-84 Migrate GET Dag Runs endpoint to FastAPI (#43506)
175b960ce0d is described below

commit 175b960ce0dd5530d96c2ed3545d10cfdc07b8f1
Author: Kalyan R <[email protected]>
AuthorDate: Wed Nov 20 23:57:39 2024 +0530

    AIP-84 Migrate GET Dag Runs endpoint to FastAPI (#43506)
    
    * add list_dag_runs
    
    * use logical_date
    
    * add tests
    
    * wip - writing tests
    
    * add tests
    
    * fix tests
    
    * Update airflow/api_fastapi/core_api/routes/public/dag_run.py
    
    * add status
    
    * Small tweak
    
    ---------
    
    Co-authored-by: pierrejeambrun <[email protected]>
---
 .../api_connexion/endpoints/dag_run_endpoint.py    |   1 +
 airflow/api_fastapi/common/parameters.py           |  40 ++-
 airflow/api_fastapi/core_api/datamodels/dag_run.py |   7 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 168 ++++++++++++
 .../api_fastapi/core_api/routes/public/dag_run.py  |  74 +++++-
 airflow/ui/openapi-gen/queries/common.ts           |  59 +++++
 airflow/ui/openapi-gen/queries/prefetch.ts         |  87 ++++++
 airflow/ui/openapi-gen/queries/queries.ts          |  96 +++++++
 airflow/ui/openapi-gen/queries/suspense.ts         |  96 +++++++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  20 ++
 airflow/ui/openapi-gen/requests/services.gen.ts    |  56 ++++
 airflow/ui/openapi-gen/requests/types.gen.ts       |  53 ++++
 .../core_api/routes/public/test_dag_run.py         | 295 ++++++++++++++++++++-
 13 files changed, 1039 insertions(+), 13 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index b8e7f36d1fd..00dd8ca9071 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -192,6 +192,7 @@ def _fetch_dag_runs(
     return session.scalars(query.offset(offset).limit(limit)).all(), 
total_entries
 
 
+@mark_fastapi_migration_done
 @security.requires_access_dag("GET", DagAccessEntity.RUN)
 @format_parameters(
     {
diff --git a/airflow/api_fastapi/common/parameters.py 
b/airflow/api_fastapi/common/parameters.py
index 942fa80e9db..6bfbfadf418 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -19,7 +19,19 @@ from __future__ import annotations
 
 from abc import ABC, abstractmethod
 from datetime import datetime
-from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, 
Optional, TypeVar, Union, overload
+from typing import (
+    TYPE_CHECKING,
+    Annotated,
+    Any,
+    Callable,
+    Generic,
+    Iterable,
+    List,
+    Optional,
+    TypeVar,
+    Union,
+    overload,
+)
 
 from fastapi import Depends, HTTPException, Query
 from pendulum.parsing.exceptions import ParserError
@@ -211,6 +223,7 @@ class SortParam(BaseParam[str]):
         "last_run_start_date": DagRun.start_date,
         "connection_id": Connection.conn_id,
         "import_error_id": ParseImportError.id,
+        "dag_run_id": DagRun.run_id,
     }
 
     def __init__(
@@ -309,6 +322,30 @@ class _OwnersFilter(BaseParam[List[str]]):
         return self.set_value(owners)
 
 
+class DagRunStateFilter(BaseParam[List[Optional[DagRunState]]]):
+    """Filter on Dag Run state."""
+
+    def to_orm(self, select: Select) -> Select:
+        if self.skip_none is False:
+            raise ValueError(f"Cannot set 'skip_none' to False on a 
{type(self)}")
+
+        if not self.value:
+            return select
+
+        conditions = [DagRun.state == state for state in self.value]
+        return select.where(or_(*conditions))
+
+    @staticmethod
+    def _convert_dag_run_states(states: Iterable[str] | None) -> 
list[DagRunState | None] | None:
+        if not states:
+            return None
+        return [None if s in ("none", None) else DagRunState(s) for s in 
states]
+
+    def depends(self, state: list[str] = Query(default_factory=list)) -> 
DagRunStateFilter:
+        states = self._convert_dag_run_states(state)
+        return self.set_value(states)
+
+
 class TIStateFilter(BaseParam[List[Optional[TaskInstanceState]]]):
     """Filter on task instance state."""
 
@@ -656,6 +693,7 @@ QueryOwnersFilter = Annotated[_OwnersFilter, 
Depends(_OwnersFilter().depends)]
 # DagRun
 QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, 
Depends(_LastDagRunStateFilter().depends)]
 QueryDagIdsFilter = Annotated[DagIdsFilter, 
Depends(DagIdsFilter(DagRun).depends)]
+QueryDagRunStateFilter = Annotated[DagRunStateFilter, 
Depends(DagRunStateFilter().depends)]
 
 # DAGWarning
 QueryDagIdInDagWarningFilter = Annotated[_DagIdFilter, 
Depends(_DagIdFilter(DagWarning.dag_id).depends)]
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 8241885aff2..f3343e6c407 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -65,3 +65,10 @@ class DAGRunResponse(BaseModel):
     triggered_by: DagRunTriggeredByType
     conf: dict
     note: str | None
+
+
+class DAGRunCollectionResponse(BaseModel):
+    """DAG Run Collection serializer for responses."""
+
+    dag_runs: list[DAGRunResponse]
+    total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 2d26571b1e3..2f74f226892 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1656,6 +1656,158 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/dags/{dag_id}/dagRuns:
+    get:
+      tags:
+      - DagRun
+      summary: Get Dag Runs
+      description: 'Get all DAG Runs.
+
+
+        This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs 
for
+        all DAGs.'
+      operationId: get_dag_runs
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: limit
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: 0
+          default: 100
+          title: Limit
+      - name: offset
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: 0
+          default: 0
+          title: Offset
+      - name: logical_date_gte
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Logical Date Gte
+      - name: logical_date_lte
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Logical Date Lte
+      - name: start_date_gte
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Start Date Gte
+      - name: start_date_lte
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Start Date Lte
+      - name: end_date_gte
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: End Date Gte
+      - name: end_date_lte
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: End Date Lte
+      - name: updated_at_gte
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Updated At Gte
+      - name: updated_at_lte
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Updated At Lte
+      - name: state
+        in: query
+        required: false
+        schema:
+          type: array
+          items:
+            type: string
+          title: State
+      - name: order_by
+        in: query
+        required: false
+        schema:
+          type: string
+          default: id
+          title: Order By
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGRunCollectionResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/dagSources/{dag_id}:
     get:
       tags:
@@ -5399,6 +5551,22 @@ components:
       type: object
       title: DAGRunClearBody
       description: DAG Run serializer for clear endpoint body.
+    DAGRunCollectionResponse:
+      properties:
+        dag_runs:
+          items:
+            $ref: '#/components/schemas/DAGRunResponse'
+          type: array
+          title: Dag Runs
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - dag_runs
+      - total_entries
+      title: DAGRunCollectionResponse
+      description: DAG Run Collection serializer for responses.
     DAGRunPatchBody:
       properties:
         state:
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index ce9c4410aeb..6ce60fe896d 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -28,11 +28,20 @@ from airflow.api.common.mark_tasks import (
     set_dag_run_state_to_queued,
     set_dag_run_state_to_success,
 )
-from airflow.api_fastapi.common.db.common import get_session
+from airflow.api_fastapi.common.db.common import get_session, paginated_select
+from airflow.api_fastapi.common.parameters import (
+    QueryDagRunStateFilter,
+    QueryLimit,
+    QueryOffset,
+    RangeFilter,
+    SortParam,
+    datetime_range_filter_factory,
+)
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.assets import 
AssetEventCollectionResponse, AssetEventResponse
 from airflow.api_fastapi.core_api.datamodels.dag_run import (
     DAGRunClearBody,
+    DAGRunCollectionResponse,
     DAGRunPatchBody,
     DAGRunPatchStates,
     DAGRunResponse,
@@ -229,3 +238,66 @@ def clear_dag_run(
         )
         dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == 
dag_run.id))
         return DAGRunResponse.model_validate(dag_run_cleared, 
from_attributes=True)
+
+
+@dag_run_router.get("", 
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]))
+def get_dag_runs(
+    dag_id: str,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    logical_date: Annotated[RangeFilter, 
Depends(datetime_range_filter_factory("logical_date", DagRun))],
+    start_date_range: Annotated[RangeFilter, 
Depends(datetime_range_filter_factory("start_date", DagRun))],
+    end_date_range: Annotated[RangeFilter, 
Depends(datetime_range_filter_factory("end_date", DagRun))],
+    update_at_range: Annotated[RangeFilter, 
Depends(datetime_range_filter_factory("updated_at", DagRun))],
+    state: QueryDagRunStateFilter,
+    order_by: Annotated[
+        SortParam,
+        Depends(
+            SortParam(
+                [
+                    "id",
+                    "state",
+                    "dag_id",
+                    "logical_date",
+                    "dag_run_id",
+                    "start_date",
+                    "end_date",
+                    "updated_at",
+                    "external_trigger",
+                    "conf",
+                ],
+                DagRun,
+            ).dynamic_depends(default="id")
+        ),
+    ],
+    session: Annotated[Session, Depends(get_session)],
+    request: Request,
+) -> DAGRunCollectionResponse:
+    """
+    Get all DAG Runs.
+
+    This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for 
all DAGs.
+    """
+    base_query = select(DagRun)
+
+    if dag_id != "~":
+        dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+        if not dag:
+            raise HTTPException(status.HTTP_404_NOT_FOUND, f"The DAG with 
dag_id: `{dag_id}` was not found")
+
+        base_query = base_query.filter(DagRun.dag_id == dag_id)
+
+    dag_run_select, total_entries = paginated_select(
+        base_query,
+        [logical_date, start_date_range, end_date_range, update_at_range, 
state],
+        order_by,
+        offset,
+        limit,
+        session,
+    )
+
+    dag_runs = session.scalars(dag_run_select)
+    return DAGRunCollectionResponse(
+        dag_runs=[DAGRunResponse.model_validate(dag_run, from_attributes=True) 
for dag_run in dag_runs],
+        total_entries=total_entries,
+    )
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index fe281c5640f..73642521802 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -396,6 +396,65 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = 
(
   useDagRunServiceGetUpstreamAssetEventsKey,
   ...(queryKey ?? [{ dagId, dagRunId }]),
 ];
+export type DagRunServiceGetDagRunsDefaultResponse = Awaited<
+  ReturnType<typeof DagRunService.getDagRuns>
+>;
+export type DagRunServiceGetDagRunsQueryResult<
+  TData = DagRunServiceGetDagRunsDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns";
+export const UseDagRunServiceGetDagRunsKeyFn = (
+  {
+    dagId,
+    endDateGte,
+    endDateLte,
+    limit,
+    logicalDateGte,
+    logicalDateLte,
+    offset,
+    orderBy,
+    startDateGte,
+    startDateLte,
+    state,
+    updatedAtGte,
+    updatedAtLte,
+  }: {
+    dagId: string;
+    endDateGte?: string;
+    endDateLte?: string;
+    limit?: number;
+    logicalDateGte?: string;
+    logicalDateLte?: string;
+    offset?: number;
+    orderBy?: string;
+    startDateGte?: string;
+    startDateLte?: string;
+    state?: string[];
+    updatedAtGte?: string;
+    updatedAtLte?: string;
+  },
+  queryKey?: Array<unknown>,
+) => [
+  useDagRunServiceGetDagRunsKey,
+  ...(queryKey ?? [
+    {
+      dagId,
+      endDateGte,
+      endDateLte,
+      limit,
+      logicalDateGte,
+      logicalDateLte,
+      offset,
+      orderBy,
+      startDateGte,
+      startDateLte,
+      state,
+      updatedAtGte,
+      updatedAtLte,
+    },
+  ]),
+];
 export type DagSourceServiceGetDagSourceDefaultResponse = Awaited<
   ReturnType<typeof DagSourceService.getDagSource>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index f7872fcc7f8..d57dfe3d24d 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -492,6 +492,93 @@ export const 
prefetchUseDagRunServiceGetUpstreamAssetEvents = (
     }),
     queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }),
   });
+/**
+ * Get Dag Runs
+ * Get all DAG Runs.
+ *
+ * This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for 
all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.limit
+ * @param data.offset
+ * @param data.logicalDateGte
+ * @param data.logicalDateLte
+ * @param data.startDateGte
+ * @param data.startDateLte
+ * @param data.endDateGte
+ * @param data.endDateLte
+ * @param data.updatedAtGte
+ * @param data.updatedAtLte
+ * @param data.state
+ * @param data.orderBy
+ * @returns DAGRunCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagRunServiceGetDagRuns = (
+  queryClient: QueryClient,
+  {
+    dagId,
+    endDateGte,
+    endDateLte,
+    limit,
+    logicalDateGte,
+    logicalDateLte,
+    offset,
+    orderBy,
+    startDateGte,
+    startDateLte,
+    state,
+    updatedAtGte,
+    updatedAtLte,
+  }: {
+    dagId: string;
+    endDateGte?: string;
+    endDateLte?: string;
+    limit?: number;
+    logicalDateGte?: string;
+    logicalDateLte?: string;
+    offset?: number;
+    orderBy?: string;
+    startDateGte?: string;
+    startDateLte?: string;
+    state?: string[];
+    updatedAtGte?: string;
+    updatedAtLte?: string;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({
+      dagId,
+      endDateGte,
+      endDateLte,
+      limit,
+      logicalDateGte,
+      logicalDateLte,
+      offset,
+      orderBy,
+      startDateGte,
+      startDateLte,
+      state,
+      updatedAtGte,
+      updatedAtLte,
+    }),
+    queryFn: () =>
+      DagRunService.getDagRuns({
+        dagId,
+        endDateGte,
+        endDateLte,
+        limit,
+        logicalDateGte,
+        logicalDateLte,
+        offset,
+        orderBy,
+        startDateGte,
+        startDateLte,
+        state,
+        updatedAtGte,
+        updatedAtLte,
+      }),
+  });
 /**
  * Get Dag Source
  * Get source code using file token.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 74e25c0258a..2ca159f465f 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -622,6 +622,102 @@ export const useDagRunServiceGetUpstreamAssetEvents = <
       DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData,
     ...options,
   });
+/**
+ * Get Dag Runs
+ * Get all DAG Runs.
+ *
+ * This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for 
all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.limit
+ * @param data.offset
+ * @param data.logicalDateGte
+ * @param data.logicalDateLte
+ * @param data.startDateGte
+ * @param data.startDateLte
+ * @param data.endDateGte
+ * @param data.endDateLte
+ * @param data.updatedAtGte
+ * @param data.updatedAtLte
+ * @param data.state
+ * @param data.orderBy
+ * @returns DAGRunCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetDagRuns = <
+  TData = Common.DagRunServiceGetDagRunsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    endDateGte,
+    endDateLte,
+    limit,
+    logicalDateGte,
+    logicalDateLte,
+    offset,
+    orderBy,
+    startDateGte,
+    startDateLte,
+    state,
+    updatedAtGte,
+    updatedAtLte,
+  }: {
+    dagId: string;
+    endDateGte?: string;
+    endDateLte?: string;
+    limit?: number;
+    logicalDateGte?: string;
+    logicalDateLte?: string;
+    offset?: number;
+    orderBy?: string;
+    startDateGte?: string;
+    startDateLte?: string;
+    state?: string[];
+    updatedAtGte?: string;
+    updatedAtLte?: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseDagRunServiceGetDagRunsKeyFn(
+      {
+        dagId,
+        endDateGte,
+        endDateLte,
+        limit,
+        logicalDateGte,
+        logicalDateLte,
+        offset,
+        orderBy,
+        startDateGte,
+        startDateLte,
+        state,
+        updatedAtGte,
+        updatedAtLte,
+      },
+      queryKey,
+    ),
+    queryFn: () =>
+      DagRunService.getDagRuns({
+        dagId,
+        endDateGte,
+        endDateLte,
+        limit,
+        logicalDateGte,
+        logicalDateLte,
+        offset,
+        orderBy,
+        startDateGte,
+        startDateLte,
+        state,
+        updatedAtGte,
+        updatedAtLte,
+      }) as TData,
+    ...options,
+  });
 /**
  * Get Dag Source
  * Get source code using file token.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 87b1a7aa6a2..50ccc8a3c68 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -604,6 +604,102 @@ export const 
useDagRunServiceGetUpstreamAssetEventsSuspense = <
       DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData,
     ...options,
   });
+/**
+ * Get Dag Runs
+ * Get all DAG Runs.
+ *
+ * This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for 
all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.limit
+ * @param data.offset
+ * @param data.logicalDateGte
+ * @param data.logicalDateLte
+ * @param data.startDateGte
+ * @param data.startDateLte
+ * @param data.endDateGte
+ * @param data.endDateLte
+ * @param data.updatedAtGte
+ * @param data.updatedAtLte
+ * @param data.state
+ * @param data.orderBy
+ * @returns DAGRunCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetDagRunsSuspense = <
+  TData = Common.DagRunServiceGetDagRunsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    endDateGte,
+    endDateLte,
+    limit,
+    logicalDateGte,
+    logicalDateLte,
+    offset,
+    orderBy,
+    startDateGte,
+    startDateLte,
+    state,
+    updatedAtGte,
+    updatedAtLte,
+  }: {
+    dagId: string;
+    endDateGte?: string;
+    endDateLte?: string;
+    limit?: number;
+    logicalDateGte?: string;
+    logicalDateLte?: string;
+    offset?: number;
+    orderBy?: string;
+    startDateGte?: string;
+    startDateLte?: string;
+    state?: string[];
+    updatedAtGte?: string;
+    updatedAtLte?: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseDagRunServiceGetDagRunsKeyFn(
+      {
+        dagId,
+        endDateGte,
+        endDateLte,
+        limit,
+        logicalDateGte,
+        logicalDateLte,
+        offset,
+        orderBy,
+        startDateGte,
+        startDateLte,
+        state,
+        updatedAtGte,
+        updatedAtLte,
+      },
+      queryKey,
+    ),
+    queryFn: () =>
+      DagRunService.getDagRuns({
+        dagId,
+        endDateGte,
+        endDateLte,
+        limit,
+        logicalDateGte,
+        logicalDateLte,
+        offset,
+        orderBy,
+        startDateGte,
+        startDateLte,
+        state,
+        updatedAtGte,
+        updatedAtLte,
+      }) as TData,
+    ...options,
+  });
 /**
  * Get Dag Source
  * Get source code using file token.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 0f08034c533..a0bb85ace80 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1404,6 +1404,26 @@ export const $DAGRunClearBody = {
   description: "DAG Run serializer for clear endpoint body.",
 } as const;
 
+export const $DAGRunCollectionResponse = {
+  properties: {
+    dag_runs: {
+      items: {
+        $ref: "#/components/schemas/DAGRunResponse",
+      },
+      type: "array",
+      title: "Dag Runs",
+    },
+    total_entries: {
+      type: "integer",
+      title: "Total Entries",
+    },
+  },
+  type: "object",
+  required: ["dag_runs", "total_entries"],
+  title: "DAGRunCollectionResponse",
+  description: "DAG Run Collection serializer for responses.",
+} as const;
+
 export const $DAGRunPatchBody = {
   properties: {
     state: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 467c2961420..63272c5e0c4 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -63,6 +63,8 @@ import type {
   GetUpstreamAssetEventsResponse,
   ClearDagRunData,
   ClearDagRunResponse,
+  GetDagRunsData,
+  GetDagRunsResponse,
   GetDagSourceData,
   GetDagSourceResponse,
   GetDagStatsData,
@@ -1033,6 +1035,60 @@ export class DagRunService {
       },
     });
   }
+
+  /**
+   * Get Dag Runs
+   * Get all DAG Runs.
+   *
+   * This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs 
for all DAGs.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.limit
+   * @param data.offset
+   * @param data.logicalDateGte
+   * @param data.logicalDateLte
+   * @param data.startDateGte
+   * @param data.startDateLte
+   * @param data.endDateGte
+   * @param data.endDateLte
+   * @param data.updatedAtGte
+   * @param data.updatedAtLte
+   * @param data.state
+   * @param data.orderBy
+   * @returns DAGRunCollectionResponse Successful Response
+   * @throws ApiError
+   */
+  public static getDagRuns(
+    data: GetDagRunsData,
+  ): CancelablePromise<GetDagRunsResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: "/public/dags/{dag_id}/dagRuns",
+      path: {
+        dag_id: data.dagId,
+      },
+      query: {
+        limit: data.limit,
+        offset: data.offset,
+        logical_date_gte: data.logicalDateGte,
+        logical_date_lte: data.logicalDateLte,
+        start_date_gte: data.startDateGte,
+        start_date_lte: data.startDateLte,
+        end_date_gte: data.endDateGte,
+        end_date_lte: data.endDateLte,
+        updated_at_gte: data.updatedAtGte,
+        updated_at_lte: data.updatedAtLte,
+        state: data.state,
+        order_by: data.orderBy,
+      },
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
 }
 
 export class DagSourceService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 21ad2839bc8..926932379b3 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -313,6 +313,14 @@ export type DAGRunClearBody = {
   dry_run?: boolean;
 };
 
+/**
+ * DAG Run Collection serializer for responses.
+ */
+export type DAGRunCollectionResponse = {
+  dag_runs: Array<DAGRunResponse>;
+  total_entries: number;
+};
+
 /**
  * DAG Run Serializer for PATCH requests.
  */
@@ -1300,6 +1308,24 @@ export type ClearDagRunResponse =
   | TaskInstanceCollectionResponse
   | DAGRunResponse;
 
+export type GetDagRunsData = {
+  dagId: string;
+  endDateGte?: string | null;
+  endDateLte?: string | null;
+  limit?: number;
+  logicalDateGte?: string | null;
+  logicalDateLte?: string | null;
+  offset?: number;
+  orderBy?: string;
+  startDateGte?: string | null;
+  startDateLte?: string | null;
+  state?: Array<string>;
+  updatedAtGte?: string | null;
+  updatedAtLte?: string | null;
+};
+
+export type GetDagRunsResponse = DAGRunCollectionResponse;
+
 export type GetDagSourceData = {
   accept?: "application/json" | "text/plain" | "*/*";
   dagId: string;
@@ -2459,6 +2485,33 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/dags/{dag_id}/dagRuns": {
+    get: {
+      req: GetDagRunsData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: DAGRunCollectionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/dagSources/{dag_id}": {
     get: {
       req: GetDagSourceData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index 89705ba85ab..2ac22a02e31 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -17,7 +17,7 @@
 
 from __future__ import annotations
 
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
 
 import pytest
 from sqlalchemy import select
@@ -52,9 +52,12 @@ DAG1_RUN1_TRIGGERED_BY = DagRunTriggeredByType.UI
 DAG1_RUN2_TRIGGERED_BY = DagRunTriggeredByType.ASSET
 DAG2_RUN1_TRIGGERED_BY = DagRunTriggeredByType.CLI
 DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API
-START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc)
-END_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc)
-EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc)
+START_DATE1 = datetime(2024, 1, 15, 0, 0, tzinfo=timezone.utc)
+LOGICAL_DATE1 = datetime(2024, 2, 16, 0, 0, tzinfo=timezone.utc)
+LOGICAL_DATE2 = datetime(2024, 2, 20, 0, 0, tzinfo=timezone.utc)
+START_DATE2 = datetime(2024, 4, 15, 0, 0, tzinfo=timezone.utc)
+LOGICAL_DATE3 = datetime(2024, 5, 16, 0, 0, tzinfo=timezone.utc)
+LOGICAL_DATE4 = datetime(2024, 5, 25, 0, 0, tzinfo=timezone.utc)
 DAG1_RUN1_NOTE = "test_note"
 
 
@@ -68,7 +71,7 @@ def setup(dag_maker, session=None):
     with dag_maker(
         DAG1_ID,
         schedule="@daily",
-        start_date=START_DATE,
+        start_date=START_DATE1,
     ):
         task1 = EmptyOperator(task_id="task_1")
     dag_run1 = dag_maker.create_dagrun(
@@ -76,6 +79,7 @@ def setup(dag_maker, session=None):
         state=DAG1_RUN1_STATE,
         run_type=DAG1_RUN1_RUN_TYPE,
         triggered_by=DAG1_RUN1_TRIGGERED_BY,
+        logical_date=LOGICAL_DATE1,
     )
 
     dag_run1.note = (DAG1_RUN1_NOTE, 1)
@@ -89,13 +93,13 @@ def setup(dag_maker, session=None):
         state=DAG1_RUN2_STATE,
         run_type=DAG1_RUN2_RUN_TYPE,
         triggered_by=DAG1_RUN2_TRIGGERED_BY,
-        logical_date=EXECUTION_DATE,
+        logical_date=LOGICAL_DATE2,
     )
 
     with dag_maker(
         DAG2_ID,
         schedule=None,
-        start_date=START_DATE,
+        start_date=START_DATE2,
     ):
         EmptyOperator(task_id="task_2")
     dag_maker.create_dagrun(
@@ -103,14 +107,14 @@ def setup(dag_maker, session=None):
         state=DAG2_RUN1_STATE,
         run_type=DAG2_RUN1_RUN_TYPE,
         triggered_by=DAG2_RUN1_TRIGGERED_BY,
-        logical_date=EXECUTION_DATE,
+        logical_date=LOGICAL_DATE3,
     )
     dag_maker.create_dagrun(
         run_id=DAG2_RUN2_ID,
         state=DAG2_RUN2_STATE,
         run_type=DAG2_RUN2_RUN_TYPE,
         triggered_by=DAG2_RUN2_TRIGGERED_BY,
-        logical_date=EXECUTION_DATE,
+        logical_date=LOGICAL_DATE4,
     )
 
     dag_maker.dagbag.sync_to_db()
@@ -156,6 +160,275 @@ class TestGetDagRun:
         assert body["detail"] == "The DagRun with dag_id: `test_dag1` and 
run_id: `invalid` was not found"
 
 
+class TestGetDagRuns:
+    @staticmethod
+    def parse_datetime(datetime_str):
+        return datetime_str.isoformat().replace("+00:00", "Z") if datetime_str 
else None
+
+    @staticmethod
+    def get_dag_run_dict(run: DagRun):
+        return {
+            "run_id": run.run_id,
+            "dag_id": run.dag_id,
+            "logical_date": TestGetDagRuns.parse_datetime(run.logical_date),
+            "queued_at": TestGetDagRuns.parse_datetime(run.queued_at),
+            "start_date": TestGetDagRuns.parse_datetime(run.start_date),
+            "end_date": TestGetDagRuns.parse_datetime(run.end_date),
+            "data_interval_start": 
TestGetDagRuns.parse_datetime(run.data_interval_start),
+            "data_interval_end": 
TestGetDagRuns.parse_datetime(run.data_interval_end),
+            "last_scheduling_decision": 
TestGetDagRuns.parse_datetime(run.last_scheduling_decision),
+            "run_type": run.run_type,
+            "state": run.state,
+            "external_trigger": run.external_trigger,
+            "triggered_by": run.triggered_by.value,
+            "conf": run.conf,
+            "note": run.note,
+        }
+
+    @pytest.mark.parametrize("dag_id, total_entries", [(DAG1_ID, 2), (DAG2_ID, 
2), ("~", 4)])
+    def test_get_dag_runs(self, test_client, session, dag_id, total_entries):
+        response = test_client.get(f"/public/dags/{dag_id}/dagRuns")
+        assert response.status_code == 200
+        body = response.json()
+        assert body["total_entries"] == total_entries
+        for each in body["dag_runs"]:
+            run = (
+                session.query(DagRun)
+                .where(DagRun.dag_id == each["dag_id"], DagRun.run_id == 
each["run_id"])
+                .one()
+            )
+            expected = self.get_dag_run_dict(run)
+            assert each == expected
+
+    def test_get_dag_runs_not_found(self, test_client):
+        response = test_client.get("/public/dags/invalid/dagRuns")
+        assert response.status_code == 404
+        body = response.json()
+        assert body["detail"] == "The DAG with dag_id: `invalid` was not found"
+
+    def test_invalid_order_by_raises_400(self, test_client):
+        response = 
test_client.get("/public/dags/test_dag1/dagRuns?order_by=invalid")
+        assert response.status_code == 400
+        body = response.json()
+        assert (
+            body["detail"]
+            == "Ordering with 'invalid' is disallowed or the attribute does 
not exist on the model"
+        )
+
+    @pytest.mark.parametrize(
+        "order_by, expected_dag_id_order",
+        [
+            ("id", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ("state", [DAG1_RUN2_ID, DAG1_RUN1_ID]),
+            ("dag_id", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ("logical_date", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ("dag_run_id", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ("start_date", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ("end_date", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ("updated_at", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ("external_trigger", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ("conf", [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+        ],
+    )
+    def test_return_correct_results_with_order_by(self, test_client, order_by, 
expected_dag_id_order):
+        response = test_client.get("/public/dags/test_dag1/dagRuns", 
params={"order_by": order_by})
+        assert response.status_code == 200
+        body = response.json()
+        assert body["total_entries"] == 2
+        assert [each["run_id"] for each in body["dag_runs"]] == 
expected_dag_id_order
+
+    @pytest.mark.parametrize(
+        "query_params, expected_dag_id_order",
+        [
+            ({}, [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ({"limit": 1}, [DAG1_RUN1_ID]),
+            ({"limit": 3}, [DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            ({"offset": 1}, [DAG1_RUN2_ID]),
+            ({"offset": 2}, []),
+            ({"limit": 1, "offset": 1}, [DAG1_RUN2_ID]),
+            ({"limit": 1, "offset": 2}, []),
+        ],
+    )
+    def test_limit_and_offset(self, test_client, query_params, 
expected_dag_id_order):
+        response = test_client.get("/public/dags/test_dag1/dagRuns", 
params=query_params)
+        assert response.status_code == 200
+        body = response.json()
+        assert body["total_entries"] == 2
+        assert [each["run_id"] for each in body["dag_runs"]] == 
expected_dag_id_order
+
+    @pytest.mark.parametrize(
+        "query_params, expected_detail",
+        [
+            (
+                {"limit": 1, "offset": -1},
+                [
+                    {
+                        "type": "greater_than_equal",
+                        "loc": ["query", "offset"],
+                        "msg": "Input should be greater than or equal to 0",
+                        "input": "-1",
+                        "ctx": {"ge": 0},
+                    }
+                ],
+            ),
+            (
+                {"limit": -1, "offset": 1},
+                [
+                    {
+                        "type": "greater_than_equal",
+                        "loc": ["query", "limit"],
+                        "msg": "Input should be greater than or equal to 0",
+                        "input": "-1",
+                        "ctx": {"ge": 0},
+                    }
+                ],
+            ),
+            (
+                {"limit": -1, "offset": -1},
+                [
+                    {
+                        "type": "greater_than_equal",
+                        "loc": ["query", "limit"],
+                        "msg": "Input should be greater than or equal to 0",
+                        "input": "-1",
+                        "ctx": {"ge": 0},
+                    },
+                    {
+                        "type": "greater_than_equal",
+                        "loc": ["query", "offset"],
+                        "msg": "Input should be greater than or equal to 0",
+                        "input": "-1",
+                        "ctx": {"ge": 0},
+                    },
+                ],
+            ),
+        ],
+    )
+    def test_bad_limit_and_offset(self, test_client, query_params, 
expected_detail):
+        response = test_client.get("/public/dags/test_dag1/dagRuns", 
params=query_params)
+        assert response.status_code == 422
+        assert response.json()["detail"] == expected_detail
+
+    @pytest.mark.parametrize(
+        "dag_id, query_params, expected_dag_id_list",
+        [
+            (DAG1_ID, {"logical_date_gte": LOGICAL_DATE1.isoformat()}, 
[DAG1_RUN1_ID, DAG1_RUN2_ID]),
+            (DAG2_ID, {"logical_date_lte": LOGICAL_DATE3.isoformat()}, 
[DAG2_RUN1_ID]),
+            (
+                "~",
+                {
+                    "start_date_gte": START_DATE1.isoformat(),
+                    "start_date_lte": (START_DATE2 - 
timedelta(days=1)).isoformat(),
+                },
+                [DAG1_RUN1_ID, DAG1_RUN2_ID],
+            ),
+            (
+                DAG1_ID,
+                {
+                    "end_date_gte": START_DATE2.isoformat(),
+                    "end_date_lte": (datetime.now(tz=timezone.utc) + 
timedelta(days=1)).isoformat(),
+                },
+                [DAG1_RUN1_ID, DAG1_RUN2_ID],
+            ),
+            (
+                DAG1_ID,
+                {
+                    "logical_date_gte": LOGICAL_DATE1.isoformat(),
+                    "logical_date_lte": LOGICAL_DATE2.isoformat(),
+                },
+                [DAG1_RUN1_ID, DAG1_RUN2_ID],
+            ),
+            (
+                DAG2_ID,
+                {
+                    "start_date_gte": START_DATE2.isoformat(),
+                    "end_date_lte": (datetime.now(tz=timezone.utc) + 
timedelta(days=1)).isoformat(),
+                },
+                [DAG2_RUN1_ID, DAG2_RUN2_ID],
+            ),
+            (DAG1_ID, {"state": DagRunState.SUCCESS.value}, [DAG1_RUN1_ID]),
+            (DAG2_ID, {"state": DagRunState.FAILED.value}, []),
+            (
+                DAG1_ID,
+                {"state": DagRunState.SUCCESS.value, "logical_date_gte": 
LOGICAL_DATE1.isoformat()},
+                [DAG1_RUN1_ID],
+            ),
+            (
+                DAG1_ID,
+                {"state": DagRunState.FAILED.value, "start_date_gte": 
START_DATE1.isoformat()},
+                [DAG1_RUN2_ID],
+            ),
+        ],
+    )
+    def test_filters(self, test_client, dag_id, query_params, 
expected_dag_id_list):
+        response = test_client.get(f"/public/dags/{dag_id}/dagRuns", 
params=query_params)
+        assert response.status_code == 200
+        body = response.json()
+        assert [each["run_id"] for each in body["dag_runs"]] == 
expected_dag_id_list
+
+    def test_bad_filters(self, test_client):
+        query_params = {
+            "logical_date_gte": "invalid",
+            "start_date_gte": "invalid",
+            "end_date_gte": "invalid",
+            "logical_date_lte": "invalid",
+            "start_date_lte": "invalid",
+            "end_date_lte": "invalid",
+        }
+        expected_detail = [
+            {
+                "type": "datetime_from_date_parsing",
+                "loc": ["query", "logical_date_gte"],
+                "msg": "Input should be a valid datetime or date, input is too 
short",
+                "input": "invalid",
+                "ctx": {"error": "input is too short"},
+            },
+            {
+                "type": "datetime_from_date_parsing",
+                "loc": ["query", "logical_date_lte"],
+                "msg": "Input should be a valid datetime or date, input is too 
short",
+                "input": "invalid",
+                "ctx": {"error": "input is too short"},
+            },
+            {
+                "type": "datetime_from_date_parsing",
+                "loc": ["query", "start_date_gte"],
+                "msg": "Input should be a valid datetime or date, input is too 
short",
+                "input": "invalid",
+                "ctx": {"error": "input is too short"},
+            },
+            {
+                "type": "datetime_from_date_parsing",
+                "loc": ["query", "start_date_lte"],
+                "msg": "Input should be a valid datetime or date, input is too 
short",
+                "input": "invalid",
+                "ctx": {"error": "input is too short"},
+            },
+            {
+                "type": "datetime_from_date_parsing",
+                "loc": ["query", "end_date_gte"],
+                "msg": "Input should be a valid datetime or date, input is too 
short",
+                "input": "invalid",
+                "ctx": {"error": "input is too short"},
+            },
+            {
+                "type": "datetime_from_date_parsing",
+                "loc": ["query", "end_date_lte"],
+                "msg": "Input should be a valid datetime or date, input is too 
short",
+                "input": "invalid",
+                "ctx": {"error": "input is too short"},
+            },
+        ]
+        response = test_client.get(f"/public/dags/{DAG1_ID}/dagRuns", 
params=query_params)
+        assert response.status_code == 422
+        body = response.json()
+        assert body["detail"] == expected_detail
+
+    def test_invalid_state(self, test_client):
+        with pytest.raises(ValueError, match="'invalid' is not a valid 
DagRunState"):
+            test_client.get(f"/public/dags/{DAG1_ID}/dagRuns", 
params={"state": "invalid"})
+
+
 class TestPatchDagRun:
     @pytest.mark.parametrize(
         "dag_id, run_id, patch_body, response_body",
@@ -271,7 +544,7 @@ class TestGetDagRunAssetTriggerEvents:
     def test_should_respond_200(self, test_client, dag_maker, session):
         asset1 = Asset(uri="ds1")
 
-        with dag_maker(dag_id="source_dag", start_date=START_DATE, 
session=session):
+        with dag_maker(dag_id="source_dag", start_date=START_DATE1, 
session=session):
             EmptyOperator(task_id="task", outlets=[asset1])
         dr = dag_maker.create_dagrun()
         ti = dr.task_instances[0]
@@ -286,7 +559,7 @@ class TestGetDagRunAssetTriggerEvents:
         )
         session.add(event)
 
-        with dag_maker(dag_id="TEST_DAG_ID", start_date=START_DATE, 
session=session):
+        with dag_maker(dag_id="TEST_DAG_ID", start_date=START_DATE1, 
session=session):
             pass
         dr = dag_maker.create_dagrun(run_id="TEST_DAG_RUN_ID", 
run_type=DagRunType.ASSET_TRIGGERED)
         dr.consumed_asset_events.append(event)


Reply via email to