This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new efcb554fbc AIP-84 Migrate GET Dag Run endpoint to FastAPI (#42725)
efcb554fbc is described below
commit efcb554fbc3e3150e3cf9e11d80148ce57cf63be
Author: Kalyan R <[email protected]>
AuthorDate: Thu Oct 10 13:33:45 2024 +0530
AIP-84 Migrate GET Dag Run endpoint to FastAPI (#42725)
* get dag_run init
* add serializer
* Merge branch 'main' of https://github.com/apache/airflow into
kalyan/AIP-84/get_dag_run
* add types
* add test
* working tests
* add note to DagRunResponse
* add note
* add test to test non Null note
* Update airflow/api_fastapi/views/public/dag_run.py
Co-authored-by: Pierre Jeambrun <[email protected]>
* Update airflow/api_fastapi/views/public/dag_run.py
Co-authored-by: Pierre Jeambrun <[email protected]>
* Merge branch 'main' of https://github.com/apache/airflow into
kalyan/AIP-84/get_dag_run
* add 404 test
---------
Co-authored-by: Pierre Jeambrun <[email protected]>
---
.../api_connexion/endpoints/dag_run_endpoint.py | 2 +
airflow/api_fastapi/openapi/v1-generated.yaml | 153 +++++++++++++++++++
.../public/__init__.py => serializers/dag_run.py} | 30 +++-
airflow/api_fastapi/views/public/__init__.py | 2 +
airflow/api_fastapi/views/public/dag_run.py | 44 ++++++
airflow/ui/openapi-gen/queries/common.ts | 19 +++
airflow/ui/openapi-gen/queries/prefetch.ts | 23 +++
airflow/ui/openapi-gen/queries/queries.ts | 32 ++++
airflow/ui/openapi-gen/queries/suspense.ts | 32 ++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 162 +++++++++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 31 ++++
airflow/ui/openapi-gen/requests/types.gen.ts | 78 ++++++++++
tests/api_fastapi/views/public/test_dag_run.py | 137 +++++++++++++++++
13 files changed, 737 insertions(+), 8 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 44891c0ef2..a862b7c969 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -63,6 +63,7 @@ from airflow.exceptions import ParamValidationError
from airflow.models import DagModel, DagRun
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
+from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
@@ -90,6 +91,7 @@ def delete_dag_run(*, dag_id: str, dag_run_id: str, session:
Session = NEW_SESSI
return NoContent, HTTPStatus.NO_CONTENT
+@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_run(
diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml
b/airflow/api_fastapi/openapi/v1-generated.yaml
index fb19c1abd1..7debfbb100 100644
--- a/airflow/api_fastapi/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/openapi/v1-generated.yaml
@@ -629,6 +629,56 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/dags/{dag_id}/dagRuns/{dag_run_id}:
+ get:
+ tags:
+ - DagRun
+ summary: Get Dag Run
+ operationId: get_dag_run
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGRunResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
ConnectionResponse:
@@ -1097,6 +1147,87 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
+ DAGRunResponse:
+ properties:
+ run_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Run Id
+ dag_id:
+ type: string
+ title: Dag Id
+ logical_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Logical Date
+ start_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Start Date
+ end_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: End Date
+ data_interval_start:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Data Interval Start
+ data_interval_end:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Data Interval End
+ last_scheduling_decision:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Last Scheduling Decision
+ run_type:
+ $ref: '#/components/schemas/DagRunType'
+ state:
+ $ref: '#/components/schemas/DagRunState'
+ external_trigger:
+ type: boolean
+ title: External Trigger
+ triggered_by:
+ $ref: '#/components/schemas/DagRunTriggeredByType'
+ conf:
+ type: object
+ title: Conf
+ note:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Note
+ type: object
+ required:
+ - run_id
+ - dag_id
+ - logical_date
+ - start_date
+ - end_date
+ - data_interval_start
+ - data_interval_end
+ - last_scheduling_decision
+ - run_type
+ - state
+ - external_trigger
+ - triggered_by
+ - conf
+ - note
+ title: DAGRunResponse
+ description: DAG Run serializer for responses.
DAGRunStates:
properties:
queued:
@@ -1157,6 +1288,28 @@ components:
so please ensure that their values always match the ones with the
same name in TaskInstanceState.'
+ DagRunTriggeredByType:
+ type: string
+ enum:
+ - cli
+ - operator
+ - rest_api
+ - ui
+ - test
+ - timetable
+ - dataset
+ - backfill
+ title: DagRunTriggeredByType
+ description: Class with TriggeredBy types for DagRun.
+ DagRunType:
+ type: string
+ enum:
+ - backfill
+ - scheduled
+ - manual
+ - dataset_triggered
+ title: DagRunType
+ description: Class with DagRun types.
DagTagPydantic:
properties:
name:
diff --git a/airflow/api_fastapi/views/public/__init__.py
b/airflow/api_fastapi/serializers/dag_run.py
similarity index 54%
copy from airflow/api_fastapi/views/public/__init__.py
copy to airflow/api_fastapi/serializers/dag_run.py
index 4e02d9ab43..4622fac645 100644
--- a/airflow/api_fastapi/views/public/__init__.py
+++ b/airflow/api_fastapi/serializers/dag_run.py
@@ -17,14 +17,28 @@
from __future__ import annotations
-from airflow.api_fastapi.views.public.connections import connections_router
-from airflow.api_fastapi.views.public.dags import dags_router
-from airflow.api_fastapi.views.public.variables import variables_router
-from airflow.api_fastapi.views.router import AirflowRouter
+from datetime import datetime
-public_router = AirflowRouter(prefix="/public")
+from pydantic import BaseModel, Field
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
-public_router.include_router(dags_router)
-public_router.include_router(connections_router)
-public_router.include_router(variables_router)
+
+class DAGRunResponse(BaseModel):
+ """DAG Run serializer for responses."""
+
+ dag_run_id: str | None = Field(alias="run_id")
+ dag_id: str
+ logical_date: datetime | None
+ start_date: datetime | None
+ end_date: datetime | None
+ data_interval_start: datetime | None
+ data_interval_end: datetime | None
+ last_scheduling_decision: datetime | None
+ run_type: DagRunType
+ state: DagRunState
+ external_trigger: bool
+ triggered_by: DagRunTriggeredByType
+ conf: dict
+ note: str | None
diff --git a/airflow/api_fastapi/views/public/__init__.py
b/airflow/api_fastapi/views/public/__init__.py
index 4e02d9ab43..9d90a09668 100644
--- a/airflow/api_fastapi/views/public/__init__.py
+++ b/airflow/api_fastapi/views/public/__init__.py
@@ -18,6 +18,7 @@
from __future__ import annotations
from airflow.api_fastapi.views.public.connections import connections_router
+from airflow.api_fastapi.views.public.dag_run import dag_run_router
from airflow.api_fastapi.views.public.dags import dags_router
from airflow.api_fastapi.views.public.variables import variables_router
from airflow.api_fastapi.views.router import AirflowRouter
@@ -28,3 +29,4 @@ public_router = AirflowRouter(prefix="/public")
public_router.include_router(dags_router)
public_router.include_router(connections_router)
public_router.include_router(variables_router)
+public_router.include_router(dag_run_router)
diff --git a/airflow/api_fastapi/views/public/dag_run.py
b/airflow/api_fastapi/views/public/dag_run.py
new file mode 100644
index 0000000000..d39fb6f2f3
--- /dev/null
+++ b/airflow/api_fastapi/views/public/dag_run.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from fastapi import Depends, HTTPException
+from sqlalchemy import select
+from sqlalchemy.orm import Session
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.db.common import get_session
+from airflow.api_fastapi.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.serializers.dag_run import DAGRunResponse
+from airflow.api_fastapi.views.router import AirflowRouter
+from airflow.models import DagRun
+
+dag_run_router = AirflowRouter(tags=["DagRun"],
prefix="/dags/{dag_id}/dagRuns")
+
+
+@dag_run_router.get("/{dag_run_id}",
responses=create_openapi_http_exception_doc([401, 403, 404]))
+async def get_dag_run(
+ dag_id: str, dag_run_id: str, session: Annotated[Session,
Depends(get_session)]
+) -> DAGRunResponse:
+ dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id))
+ if dag_run is None:
+ raise HTTPException(
+ 404, f"The DagRun with dag_id: `{dag_id}` and run_id:
`{dag_run_id}` was not found"
+ )
+
+ return DAGRunResponse.model_validate(dag_run, from_attributes=True)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index a4d65c6900..aaff196c07 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -4,6 +4,7 @@ import { UseQueryResult } from "@tanstack/react-query";
import {
AssetService,
ConnectionService,
+ DagRunService,
DagService,
DashboardService,
VariableService,
@@ -166,6 +167,24 @@ export const UseVariableServiceGetVariableKeyFn = (
},
queryKey?: Array<unknown>,
) => [useVariableServiceGetVariableKey, ...(queryKey ?? [{ variableKey }])];
+export type DagRunServiceGetDagRunDefaultResponse = Awaited<
+ ReturnType<typeof DagRunService.getDagRun>
+>;
+export type DagRunServiceGetDagRunQueryResult<
+ TData = DagRunServiceGetDagRunDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun";
+export const UseDagRunServiceGetDagRunKeyFn = (
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: Array<unknown>,
+) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
export type DagServicePatchDagsMutationResult = Awaited<
ReturnType<typeof DagService.patchDags>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 8bd691ca33..3e194302f4 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -4,6 +4,7 @@ import { type QueryClient } from "@tanstack/react-query";
import {
AssetService,
ConnectionService,
+ DagRunService,
DagService,
DashboardService,
VariableService,
@@ -206,3 +207,25 @@ export const prefetchUseVariableServiceGetVariable = (
queryKey: Common.UseVariableServiceGetVariableKeyFn({ variableKey }),
queryFn: () => VariableService.getVariable({ variableKey }),
});
+/**
+ * Get Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagRunServiceGetDagRun = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
+ queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
+ });
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 51b8f4fb05..19bb17b342 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -9,6 +9,7 @@ import {
import {
AssetService,
ConnectionService,
+ DagRunService,
DagService,
DashboardService,
VariableService,
@@ -263,6 +264,37 @@ export const useVariableServiceGetVariable = <
queryFn: () => VariableService.getVariable({ variableKey }) as TData,
...options,
});
+/**
+ * Get Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetDagRun = <
+ TData = Common.DagRunServiceGetDagRunDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseDagRunServiceGetDagRunKeyFn(
+ { dagId, dagRunId },
+ queryKey,
+ ),
+ queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
+ ...options,
+ });
/**
* Patch Dags
* Patch multiple DAGs.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index b437007468..79ad479f0a 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -4,6 +4,7 @@ import { UseQueryOptions, useSuspenseQuery } from
"@tanstack/react-query";
import {
AssetService,
ConnectionService,
+ DagRunService,
DagService,
DashboardService,
VariableService,
@@ -258,3 +259,34 @@ export const useVariableServiceGetVariableSuspense = <
queryFn: () => VariableService.getVariable({ variableKey }) as TData,
...options,
});
+/**
+ * Get Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetDagRunSuspense = <
+ TData = Common.DagRunServiceGetDagRunDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseDagRunServiceGetDagRunKeyFn(
+ { dagId, dagRunId },
+ queryKey,
+ ),
+ queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
+ ...options,
+ });
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 8f76ebd13c..18df528465 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -784,6 +784,145 @@ export const $DAGResponse = {
description: "DAG serializer for responses.",
} as const;
+export const $DAGRunResponse = {
+ properties: {
+ run_id: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Run Id",
+ },
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ logical_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Logical Date",
+ },
+ start_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Start Date",
+ },
+ end_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "End Date",
+ },
+ data_interval_start: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Data Interval Start",
+ },
+ data_interval_end: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Data Interval End",
+ },
+ last_scheduling_decision: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Last Scheduling Decision",
+ },
+ run_type: {
+ $ref: "#/components/schemas/DagRunType",
+ },
+ state: {
+ $ref: "#/components/schemas/DagRunState",
+ },
+ external_trigger: {
+ type: "boolean",
+ title: "External Trigger",
+ },
+ triggered_by: {
+ $ref: "#/components/schemas/DagRunTriggeredByType",
+ },
+ conf: {
+ type: "object",
+ title: "Conf",
+ },
+ note: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Note",
+ },
+ },
+ type: "object",
+ required: [
+ "run_id",
+ "dag_id",
+ "logical_date",
+ "start_date",
+ "end_date",
+ "data_interval_start",
+ "data_interval_end",
+ "last_scheduling_decision",
+ "run_type",
+ "state",
+ "external_trigger",
+ "triggered_by",
+ "conf",
+ "note",
+ ],
+ title: "DAGRunResponse",
+ description: "DAG Run serializer for responses.",
+} as const;
+
export const $DAGRunStates = {
properties: {
queued: {
@@ -845,6 +984,29 @@ so please ensure that their values always match the ones
with the
same name in TaskInstanceState.`,
} as const;
+export const $DagRunTriggeredByType = {
+ type: "string",
+ enum: [
+ "cli",
+ "operator",
+ "rest_api",
+ "ui",
+ "test",
+ "timetable",
+ "dataset",
+ "backfill",
+ ],
+ title: "DagRunTriggeredByType",
+ description: "Class with TriggeredBy types for DagRun.",
+} as const;
+
+export const $DagRunType = {
+ type: "string",
+ enum: ["backfill", "scheduled", "manual", "dataset_triggered"],
+ title: "DagRunType",
+ description: "Class with DagRun types.",
+} as const;
+
export const $DagTagPydantic = {
properties: {
name: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 24fbb9c29c..9a126aef25 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -25,6 +25,8 @@ import type {
DeleteVariableResponse,
GetVariableData,
GetVariableResponse,
+ GetDagRunData,
+ GetDagRunResponse,
} from "./types.gen";
export class AssetService {
@@ -361,3 +363,32 @@ export class VariableService {
});
}
}
+
+export class DagRunService {
+ /**
+ * Get Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+ public static getDagRun(
+ data: GetDagRunData,
+ ): CancelablePromise<GetDagRunResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+}
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 368c981b9d..45bfa51aec 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -120,6 +120,28 @@ export type DAGResponse = {
readonly file_token: string;
};
+/**
+ * DAG Run serializer for responses.
+ */
+export type DAGRunResponse = {
+ run_id: string | null;
+ dag_id: string;
+ logical_date: string | null;
+ start_date: string | null;
+ end_date: string | null;
+ data_interval_start: string | null;
+ data_interval_end: string | null;
+ last_scheduling_decision: string | null;
+ run_type: DagRunType;
+ state: DagRunState;
+ external_trigger: boolean;
+ triggered_by: DagRunTriggeredByType;
+ conf: {
+ [key: string]: unknown;
+ };
+ note: string | null;
+};
+
/**
* DAG Run States for responses.
*/
@@ -149,6 +171,28 @@ export type DAGRunTypes = {
*/
export type DagRunState = "queued" | "running" | "success" | "failed";
+/**
+ * Class with TriggeredBy types for DagRun.
+ */
+export type DagRunTriggeredByType =
+ | "cli"
+ | "operator"
+ | "rest_api"
+ | "ui"
+ | "test"
+ | "timetable"
+ | "dataset"
+ | "backfill";
+
+/**
+ * Class with DagRun types.
+ */
+export type DagRunType =
+ | "backfill"
+ | "scheduled"
+ | "manual"
+ | "dataset_triggered";
+
/**
* Serializable representation of the DagTag ORM SqlAlchemyModel used by
internal API.
*/
@@ -304,6 +348,13 @@ export type GetVariableData = {
export type GetVariableResponse = VariableResponse;
+export type GetDagRunData = {
+ dagId: string;
+ dagRunId: string;
+};
+
+export type GetDagRunResponse = DAGRunResponse;
+
export type $OpenApiTs = {
"/ui/next_run_assets/{dag_id}": {
get: {
@@ -580,4 +631,31 @@ export type $OpenApiTs = {
};
};
};
+ "/public/dags/{dag_id}/dagRuns/{dag_run_id}": {
+ get: {
+ req: GetDagRunData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: DAGRunResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
};
diff --git a/tests/api_fastapi/views/public/test_dag_run.py
b/tests/api_fastapi/views/public/test_dag_run.py
new file mode 100644
index 0000000000..dab8190706
--- /dev/null
+++ b/tests/api_fastapi/views/public/test_dag_run.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime, timezone
+
+import pytest
+
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.session import provide_session
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
+from tests.test_utils.db import clear_db_dags, clear_db_runs,
clear_db_serialized_dags
+
+pytestmark = pytest.mark.db_test
+
+DAG1_ID = "test_dag1"
+DAG2_ID = "test_dag2"
+DAG1_RUN1_ID = "dag_run_1"
+DAG1_RUN2_ID = "dag_run_2"
+DAG2_RUN1_ID = "dag_run_3"
+DAG2_RUN2_ID = "dag_run_4"
+DAG1_RUN1_STATE = DagRunState.SUCCESS
+DAG1_RUN2_STATE = DagRunState.FAILED
+DAG2_RUN1_STATE = DagRunState.SUCCESS
+DAG2_RUN2_STATE = DagRunState.SUCCESS
+DAG1_RUN1_RUN_TYPE = DagRunType.MANUAL
+DAG1_RUN2_RUN_TYPE = DagRunType.SCHEDULED
+DAG2_RUN1_RUN_TYPE = DagRunType.BACKFILL_JOB
+DAG2_RUN2_RUN_TYPE = DagRunType.DATASET_TRIGGERED
+DAG1_RUN1_TRIGGERED_BY = DagRunTriggeredByType.UI
+DAG1_RUN2_TRIGGERED_BY = DagRunTriggeredByType.DATASET
+DAG2_RUN1_TRIGGERED_BY = DagRunTriggeredByType.CLI
+DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API
+START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc)
+EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc)
+DAG1_NOTE = "test_note"
+
+
[email protected](autouse=True)
+@provide_session
+def setup(dag_maker, session=None):
+ clear_db_runs()
+ clear_db_dags()
+ clear_db_serialized_dags()
+
+ with dag_maker(
+ DAG1_ID,
+ schedule="@daily",
+ start_date=START_DATE,
+ ):
+ EmptyOperator(task_id="task_1")
+ dag1 = dag_maker.create_dagrun(
+ run_id=DAG1_RUN1_ID,
+ state=DAG1_RUN1_STATE,
+ run_type=DAG1_RUN1_RUN_TYPE,
+ triggered_by=DAG1_RUN1_TRIGGERED_BY,
+ )
+ dag1.note = (DAG1_NOTE, 1)
+
+ dag_maker.create_dagrun(
+ run_id=DAG1_RUN2_ID,
+ state=DAG1_RUN2_STATE,
+ run_type=DAG1_RUN2_RUN_TYPE,
+ triggered_by=DAG1_RUN2_TRIGGERED_BY,
+ execution_date=EXECUTION_DATE,
+ )
+
+ with dag_maker(
+ DAG2_ID,
+ schedule=None,
+ start_date=START_DATE,
+ ):
+ EmptyOperator(task_id="task_2")
+ dag_maker.create_dagrun(
+ run_id=DAG2_RUN1_ID,
+ state=DAG2_RUN1_STATE,
+ run_type=DAG2_RUN1_RUN_TYPE,
+ triggered_by=DAG2_RUN1_TRIGGERED_BY,
+ execution_date=EXECUTION_DATE,
+ )
+ dag_maker.create_dagrun(
+ run_id=DAG2_RUN2_ID,
+ state=DAG2_RUN2_STATE,
+ run_type=DAG2_RUN2_RUN_TYPE,
+ triggered_by=DAG2_RUN2_TRIGGERED_BY,
+ execution_date=EXECUTION_DATE,
+ )
+
+ dag_maker.dagbag.sync_to_db()
+ dag_maker.dag_model
+ dag_maker.dag_model.has_task_concurrency_limits = True
+ session.merge(dag_maker.dag_model)
+ session.commit()
+
+
[email protected](
+ "dag_id, run_id, state, run_type, triggered_by, dag_run_note",
+ [
+ (DAG1_ID, DAG1_RUN1_ID, DAG1_RUN1_STATE, DAG1_RUN1_RUN_TYPE,
DAG1_RUN1_TRIGGERED_BY, DAG1_NOTE),
+ (DAG1_ID, DAG1_RUN2_ID, DAG1_RUN2_STATE, DAG1_RUN2_RUN_TYPE,
DAG1_RUN2_TRIGGERED_BY, None),
+ (DAG2_ID, DAG2_RUN1_ID, DAG2_RUN1_STATE, DAG2_RUN1_RUN_TYPE,
DAG2_RUN1_TRIGGERED_BY, None),
+ (DAG2_ID, DAG2_RUN2_ID, DAG2_RUN2_STATE, DAG2_RUN2_RUN_TYPE,
DAG2_RUN2_TRIGGERED_BY, None),
+ ],
+)
+def test_get_dag_run(test_client, dag_id, run_id, state, run_type,
triggered_by, dag_run_note):
+ response = test_client.get(f"/public/dags/{dag_id}/dagRuns/{run_id}")
+ assert response.status_code == 200
+ body = response.json()
+ assert body["dag_id"] == dag_id
+ assert body["run_id"] == run_id
+ assert body["state"] == state
+ assert body["run_type"] == run_type
+ assert body["triggered_by"] == triggered_by.value
+ assert body["note"] == dag_run_note
+
+
+def test_get_dag_run_not_found(test_client):
+ response = test_client.get(f"/public/dags/{DAG1_ID}/dagRuns/invalid")
+ assert response.status_code == 404
+ body = response.json()
+ assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id:
`invalid` was not found"