This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c3aabba0488 AIP-84 Migrate Clear Dag Run public endpoint to FastAPI
(#42975)
c3aabba0488 is described below
commit c3aabba04881e5d25fe3b6ef7727fb52d28b8a2c
Author: Kalyan R <[email protected]>
AuthorDate: Fri Nov 15 13:52:37 2024 +0530
AIP-84 Migrate Clear Dag Run public endpoint to FastAPI (#42975)
* add clear_dag_run
* add tests
* Merge branch 'main' of https://github.com/apache/airflow into
kalyan/API-84/clear_dag_run
* add ti response
* add
* use logical_date
* fix tests
* remove async
* Update airflow/api_fastapi/core_api/routes/public/dag_run.py
Co-authored-by: Pierre Jeambrun <[email protected]>
* Update airflow/api_fastapi/core_api/datamodels/dag_run.py
Co-authored-by: Pierre Jeambrun <[email protected]>
* remove type ignore
* update ti state and assert it
* reuse state
* remove breakpoint
* feedback
---------
Co-authored-by: Pierre Jeambrun <[email protected]>
---
.../api_connexion/endpoints/dag_run_endpoint.py | 1 +
airflow/api_fastapi/core_api/datamodels/dag_run.py | 6 ++
.../api_fastapi/core_api/openapi/v1-generated.yaml | 68 ++++++++++++++++++++++
.../api_fastapi/core_api/routes/public/dag_run.py | 53 +++++++++++++++++
airflow/models/dag.py | 36 +++++++++++-
airflow/ui/openapi-gen/queries/common.ts | 3 +
airflow/ui/openapi-gen/queries/queries.ts | 47 +++++++++++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 13 +++++
airflow/ui/openapi-gen/requests/services.gen.ts | 32 ++++++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 44 ++++++++++++++
.../core_api/routes/public/test_dag_run.py | 51 +++++++++++++++-
11 files changed, 351 insertions(+), 3 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 6a38eb27ff4..0c0a0322e62 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -403,6 +403,7 @@ def update_dag_run_state(*, dag_id: str, dag_run_id: str,
session: Session = NEW
return dagrun_schema.dump(dag_run)
+@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.RUN)
@action_logging
@provide_session
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 102567f6997..8241885aff2 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -41,6 +41,12 @@ class DAGRunPatchBody(BaseModel):
note: str | None = Field(None, max_length=1000)
+class DAGRunClearBody(BaseModel):
+ """DAG Run serializer for clear endpoint body."""
+
+ dry_run: bool = True
+
+
class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 890fb6b6c8d..56e40a7899a 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1164,6 +1164,65 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
+ post:
+ tags:
+ - DagRun
+ summary: Clear Dag Run
+ operationId: clear_dag_run
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGRunClearBody'
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ anyOf:
+ - $ref: '#/components/schemas/TaskInstanceCollectionResponse'
+ - $ref: '#/components/schemas/DAGRunResponse'
+ title: Response Clear Dag Run
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/dagSources/{file_token}:
get:
tags:
@@ -4383,6 +4442,15 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
+ DAGRunClearBody:
+ properties:
+ dry_run:
+ type: boolean
+ title: Dry Run
+ default: true
+ type: object
+ title: DAGRunClearBody
+ description: DAG Run serializer for clear endpoint body.
DAGRunPatchBody:
properties:
state:
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 810896806ee..d95cf76f69a 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -31,10 +31,15 @@ from airflow.api.common.mark_tasks import (
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.dag_run import (
+ DAGRunClearBody,
DAGRunPatchBody,
DAGRunPatchStates,
DAGRunResponse,
)
+from airflow.api_fastapi.core_api.datamodels.task_instances import (
+ TaskInstanceCollectionResponse,
+ TaskInstanceResponse,
+)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.models import DAG, DagRun
@@ -142,3 +147,51 @@ def patch_dag_run(
dag_run = session.get(DagRun, dag_run.id)
return DAGRunResponse.model_validate(dag_run, from_attributes=True)
+
+
+@dag_run_router.post(
+ "/{dag_run_id}/clear",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND])
+)
+def clear_dag_run(
+ dag_id: str,
+ dag_run_id: str,
+ body: DAGRunClearBody,
+ request: Request,
+ session: Annotated[Session, Depends(get_session)],
+) -> TaskInstanceCollectionResponse | DAGRunResponse:
+ dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id))
+ if dag_run is None:
+ raise HTTPException(
+ 404, f"The DagRun with dag_id: `{dag_id}` and run_id:
`{dag_run_id}` was not found"
+ )
+
+ dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+ start_date = dag_run.logical_date
+ end_date = dag_run.logical_date
+
+ if body.dry_run:
+ task_instances = dag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ task_ids=None,
+ only_failed=False,
+ dry_run=True,
+ session=session,
+ )
+
+ return TaskInstanceCollectionResponse(
+ task_instances=[
+ TaskInstanceResponse.model_validate(ti, from_attributes=True)
for ti in task_instances
+ ],
+ total_entries=len(task_instances),
+ )
+ else:
+ dag.clear(
+ start_date=dag_run.start_date,
+ end_date=dag_run.end_date,
+ task_ids=None,
+ only_failed=False,
+ session=session,
+ )
+ dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id ==
dag_run.id))
+ return DAGRunResponse.model_validate(dag_run_cleared,
from_attributes=True)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index e48ec0a9a9c..8ca80d019e0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1404,6 +1404,40 @@ class DAG(TaskSDKDag, LoggingMixin):
return altered
+ @overload
+ def clear(
+ self,
+ *,
+ dry_run: Literal[True],
+ task_ids: Collection[str | tuple[str, int]] | None = None,
+ start_date: datetime | None = None,
+ end_date: datetime | None = None,
+ only_failed: bool = False,
+ only_running: bool = False,
+ confirm_prompt: bool = False,
+ dag_run_state: DagRunState = DagRunState.QUEUED,
+ session: Session = NEW_SESSION,
+ dag_bag: DagBag | None = None,
+ exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
+ ) -> list[TaskInstance]: ... # pragma: no cover
+
+ @overload
+ def clear(
+ self,
+ *,
+ task_ids: Collection[str | tuple[str, int]] | None = None,
+ start_date: datetime | None = None,
+ end_date: datetime | None = None,
+ only_failed: bool = False,
+ only_running: bool = False,
+ confirm_prompt: bool = False,
+ dag_run_state: DagRunState = DagRunState.QUEUED,
+ dry_run: Literal[False] = False,
+ session: Session = NEW_SESSION,
+ dag_bag: DagBag | None = None,
+ exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
+ ) -> int: ... # pragma: no cover
+
@provide_session
def clear(
self,
@@ -1418,7 +1452,7 @@ class DAG(TaskSDKDag, LoggingMixin):
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
- ) -> int | Iterable[TaskInstance]:
+ ) -> int | list[TaskInstance]:
"""
Clear a set of task instances associated with the current dag for a
specified date range.
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 7758a94a410..d1bb5661434 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1054,6 +1054,9 @@ export type ConnectionServicePostConnectionMutationResult
= Awaited<
export type ConnectionServiceTestConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.testConnection>
>;
+export type DagRunServiceClearDagRunMutationResult = Awaited<
+ ReturnType<typeof DagRunService.clearDagRun>
+>;
export type PoolServicePostPoolMutationResult = Awaited<
ReturnType<typeof PoolService.postPool>
>;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 3b57095183a..dc00175ba4d 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -33,6 +33,7 @@ import {
BackfillPostBody,
ConnectionBody,
DAGPatchBody,
+ DAGRunClearBody,
DAGRunPatchBody,
DagRunState,
DagWarningType,
@@ -1814,6 +1815,52 @@ export const useConnectionServiceTestConnection = <
}) as unknown as Promise<TData>,
...options,
});
+/**
+ * Clear Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.requestBody
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceClearDagRun = <
+ TData = Common.DagRunServiceClearDagRunMutationResult,
+ TError = unknown,
+ TContext = unknown,
+>(
+ options?: Omit<
+ UseMutationOptions<
+ TData,
+ TError,
+ {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunClearBody;
+ },
+ TContext
+ >,
+ "mutationFn"
+ >,
+) =>
+ useMutation<
+ TData,
+ TError,
+ {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunClearBody;
+ },
+ TContext
+ >({
+ mutationFn: ({ dagId, dagRunId, requestBody }) =>
+ DagRunService.clearDagRun({
+ dagId,
+ dagRunId,
+ requestBody,
+ }) as unknown as Promise<TData>,
+ ...options,
+ });
/**
* Post Pool
* Create a Pool.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 8a8a50bb743..2011934848b 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1303,6 +1303,19 @@ export const $DAGResponse = {
description: "DAG serializer for responses.",
} as const;
+export const $DAGRunClearBody = {
+ properties: {
+ dry_run: {
+ type: "boolean",
+ title: "Dry Run",
+ default: true,
+ },
+ },
+ type: "object",
+ title: "DAGRunClearBody",
+ description: "DAG Run serializer for clear endpoint body.",
+} as const;
+
export const $DAGRunPatchBody = {
properties: {
state: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 2cd106a8e08..d5efbd4ab2f 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -45,6 +45,8 @@ import type {
DeleteDagRunResponse,
PatchDagRunData,
PatchDagRunResponse,
+ ClearDagRunData,
+ ClearDagRunResponse,
GetDagSourceData,
GetDagSourceResponse,
GetDagStatsData,
@@ -735,6 +737,36 @@ export class DagRunService {
},
});
}
+
+ /**
+ * Clear Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.requestBody
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+ public static clearDagRun(
+ data: ClearDagRunData,
+ ): CancelablePromise<ClearDagRunResponse> {
+ return __request(OpenAPI, {
+ method: "POST",
+ url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear",
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ },
+ body: data.requestBody,
+ mediaType: "application/json",
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
}
export class DagSourceService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 35848f3a88f..a1ee89237a8 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -273,6 +273,13 @@ export type DAGResponse = {
readonly file_token: string;
};
+/**
+ * DAG Run serializer for clear endpoint body.
+ */
+export type DAGRunClearBody = {
+ dry_run?: boolean;
+};
+
/**
* DAG Run Serializer for PATCH requests.
*/
@@ -1114,6 +1121,16 @@ export type PatchDagRunData = {
export type PatchDagRunResponse = DAGRunResponse;
+export type ClearDagRunData = {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunClearBody;
+};
+
+export type ClearDagRunResponse =
+ | TaskInstanceCollectionResponse
+ | DAGRunResponse;
+
export type GetDagSourceData = {
accept?: string;
fileToken: string;
@@ -1978,6 +1995,33 @@ export type $OpenApiTs = {
};
};
};
+ "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear": {
+ post: {
+ req: ClearDagRunData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: TaskInstanceCollectionResponse | DAGRunResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/dagSources/{file_token}": {
get: {
req: GetDagSourceData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index 64c3512e88b..eec6955b788 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -20,10 +20,12 @@ from __future__ import annotations
from datetime import datetime, timezone
import pytest
+from sqlalchemy import select
+from airflow.models import DagRun
from airflow.operators.empty import EmptyOperator
from airflow.utils.session import provide_session
-from airflow.utils.state import DagRunState
+from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.db import clear_db_dags, clear_db_runs,
clear_db_serialized_dags
@@ -65,15 +67,20 @@ def setup(dag_maker, session=None):
schedule="@daily",
start_date=START_DATE,
):
- EmptyOperator(task_id="task_1")
+ task1 = EmptyOperator(task_id="task_1")
dag_run1 = dag_maker.create_dagrun(
run_id=DAG1_RUN1_ID,
state=DAG1_RUN1_STATE,
run_type=DAG1_RUN1_RUN_TYPE,
triggered_by=DAG1_RUN1_TRIGGERED_BY,
)
+
dag_run1.note = (DAG1_RUN1_NOTE, 1)
+ ti1 = dag_run1.get_task_instance(task_id="task_1")
+ ti1.task = task1
+ ti1.state = State.SUCCESS
+
dag_maker.create_dagrun(
run_id=DAG1_RUN2_ID,
state=DAG1_RUN2_STATE,
@@ -106,6 +113,7 @@ def setup(dag_maker, session=None):
dag_maker.dagbag.sync_to_db()
dag_maker.dag_model
dag_maker.dag_model.has_task_concurrency_limits = True
+ session.merge(ti1)
session.merge(dag_maker.dag_model)
session.commit()
@@ -254,3 +262,42 @@ class TestDeleteDagRun:
assert response.status_code == 404
body = response.json()
assert body["detail"] == "The DagRun with dag_id: `test_dag1` and
run_id: `invalid` was not found"
+
+
+class TestClearDagRun:
+ def test_clear_dag_run(self, test_client):
+ response = test_client.post(
+ f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
json={"dry_run": False}
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["dag_id"] == DAG1_ID
+ assert body["run_id"] == DAG1_RUN1_ID
+ assert body["state"] == "queued"
+
+ @pytest.mark.parametrize(
+ "body",
+ [{"dry_run": True}, {}],
+ )
+ def test_clear_dag_run_dry_run(self, test_client, session, body):
+ response =
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
json=body)
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == 1
+ for each in body["task_instances"]:
+ assert each["state"] == "success"
+ dag_run = session.scalar(select(DagRun).filter_by(dag_id=DAG1_ID,
run_id=DAG1_RUN1_ID))
+ assert dag_run.state == DAG1_RUN1_STATE
+
+ def test_clear_dag_run_not_found(self, test_client):
+ response =
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/invalid/clear",
json={"dry_run": False})
+ assert response.status_code == 404
+ body = response.json()
+ assert body["detail"] == "The DagRun with dag_id: `test_dag1` and
run_id: `invalid` was not found"
+
+ def test_clear_dag_run_unprocessable_entity(self, test_client):
+ response =
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear")
+ assert response.status_code == 422
+ body = response.json()
+ assert body["detail"][0]["msg"] == "Field required"
+ assert body["detail"][0]["loc"][0] == "body"