This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c9c32710e Migrate the public endpoint Delete DAG to FastAPI (#42914)
6c9c32710e is described below
commit 6c9c32710e1ebbfb244216763239169e5c972d02
Author: Omkar P <[email protected]>
AuthorDate: Tue Oct 15 14:15:51 2024 +0530
Migrate the public endpoint Delete DAG to FastAPI (#42914)
* Migrate the public endpoint Delete DAG to FastAPI
* Refactor tests
---
airflow/api_connexion/endpoints/dag_endpoint.py | 2 +
airflow/api_fastapi/openapi/v1-generated.yaml | 49 +++++
airflow/api_fastapi/views/public/dags.py | 19 +-
airflow/ui/openapi-gen/queries/common.ts | 3 +
airflow/ui/openapi-gen/queries/queries.ts | 37 ++++
airflow/ui/openapi-gen/requests/services.gen.ts | 29 +++
airflow/ui/openapi-gen/requests/types.gen.ts | 35 ++++
airflow/utils/api_migration.py | 2 +-
tests/api_fastapi/views/public/test_dags.py | 242 ++++++++++++++++--------
9 files changed, 336 insertions(+), 82 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py
b/airflow/api_connexion/endpoints/dag_endpoint.py
index 3d0d3dd8bf..0352297bff 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -52,6 +52,7 @@ if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse, UpdateMask
+@mark_fastapi_migration_done
@security.requires_access_dag("GET")
@provide_session
def get_dag(
@@ -215,6 +216,7 @@ def patch_dags(limit, session, offset=0, only_active=True,
tags=None, dag_id_pat
return dags_collection_schema.dump(DAGCollection(dags=dags,
total_entries=total_entries))
+@mark_fastapi_migration_done
@security.requires_access_dag("DELETE")
@action_logging
@provide_session
diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml
b/airflow/api_fastapi/openapi/v1-generated.yaml
index 235410a6d3..56f48c73e9 100644
--- a/airflow/api_fastapi/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/openapi/v1-generated.yaml
@@ -408,6 +408,55 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ delete:
+ tags:
+ - DAG
+ summary: Delete Dag
+ description: Delete the specific DAG.
+ operationId: delete_dag
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema: {}
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unprocessable Entity
/public/dags/{dag_id}/details:
get:
tags:
diff --git a/airflow/api_fastapi/views/public/dags.py
b/airflow/api_fastapi/views/public/dags.py
index ca0f44162e..eb8233a7f7 100644
--- a/airflow/api_fastapi/views/public/dags.py
+++ b/airflow/api_fastapi/views/public/dags.py
@@ -17,11 +17,12 @@
from __future__ import annotations
-from fastapi import Depends, HTTPException, Query, Request
+from fastapi import Depends, HTTPException, Query, Request, Response
from sqlalchemy import update
from sqlalchemy.orm import Session
from typing_extensions import Annotated
+from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.db.common import (
get_session,
paginated_select,
@@ -48,6 +49,7 @@ from airflow.api_fastapi.serializers.dags import (
DAGResponse,
)
from airflow.api_fastapi.views.router import AirflowRouter
+from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel
dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
@@ -204,3 +206,18 @@ async def patch_dags(
dags=[DAGResponse.model_validate(dag, from_attributes=True) for dag in
dags],
total_entries=total_entries,
)
+
+
+@dags_router.delete("/{dag_id}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
+async def delete_dag(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+) -> Response:
+ """Delete the specific DAG."""
+ try:
+ delete_dag_module.delete_dag(dag_id, session=session)
+ except DagNotFound:
+ raise HTTPException(404, f"Dag with id: {dag_id} was not found")
+ except AirflowException:
+ raise HTTPException(409, f"Task instances of dag with id: '{dag_id}'
are still running")
+ return Response(status_code=204)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 426e28447f..2f1c6a78d9 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -197,6 +197,9 @@ export type DagServicePatchDagMutationResult = Awaited<
export type VariableServicePatchVariableMutationResult = Awaited<
ReturnType<typeof VariableService.patchVariable>
>;
+export type DagServiceDeleteDagMutationResult = Awaited<
+ ReturnType<typeof DagService.deleteDag>
+>;
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.deleteConnection>
>;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 557a7ba8ff..a16bdf165b 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -514,6 +514,43 @@ export const useVariableServicePatchVariable = <
}) as unknown as Promise<TData>,
...options,
});
+/**
+ * Delete Dag
+ * Delete the specific DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useDagServiceDeleteDag = <
+ TData = Common.DagServiceDeleteDagMutationResult,
+ TError = unknown,
+ TContext = unknown,
+>(
+ options?: Omit<
+ UseMutationOptions<
+ TData,
+ TError,
+ {
+ dagId: string;
+ },
+ TContext
+ >,
+ "mutationFn"
+ >,
+) =>
+ useMutation<
+ TData,
+ TError,
+ {
+ dagId: string;
+ },
+ TContext
+ >({
+ mutationFn: ({ dagId }) =>
+ DagService.deleteDag({ dagId }) as unknown as Promise<TData>,
+ ...options,
+ });
/**
* Delete Connection
* Delete a connection entry.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 78b113c7f2..8d7f0cee2b 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -15,6 +15,8 @@ import type {
GetDagResponse,
PatchDagData,
PatchDagResponse,
+ DeleteDagData,
+ DeleteDagResponse,
GetDagDetailsData,
GetDagDetailsResponse,
DeleteConnectionData,
@@ -234,6 +236,33 @@ export class DagService {
});
}
+ /**
+ * Delete Dag
+ * Delete the specific DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+ public static deleteDag(
+ data: DeleteDagData,
+ ): CancelablePromise<DeleteDagResponse> {
+ return __request(OpenAPI, {
+ method: "DELETE",
+ url: "/public/dags/{dag_id}",
+ path: {
+ dag_id: data.dagId,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Unprocessable Entity",
+ },
+ });
+ }
+
/**
* Get Dag Details
* Get details of DAG.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 856517d560..7f603a1adb 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -327,6 +327,12 @@ export type PatchDagData = {
export type PatchDagResponse = DAGResponse;
+export type DeleteDagData = {
+ dagId: string;
+};
+
+export type DeleteDagResponse = unknown;
+
export type GetDagDetailsData = {
dagId: string;
};
@@ -525,6 +531,35 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
+ delete: {
+ req: DeleteDagData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: unknown;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Unprocessable Entity
+ */
+ 422: HTTPExceptionResponse;
+ };
+ };
};
"/public/dags/{dag_id}/details": {
get: {
diff --git a/airflow/utils/api_migration.py b/airflow/utils/api_migration.py
index d6b61a933d..3e6ba3881c 100644
--- a/airflow/utils/api_migration.py
+++ b/airflow/utils/api_migration.py
@@ -31,7 +31,7 @@ PS = ParamSpec("PS")
RT = TypeVar("RT")
-def mark_fastapi_migration_done(function: Callable[PS, RT]) -> Callable[PS,
RT]:
+def mark_fastapi_migration_done(function: Callable[..., RT]) -> Callable[...,
RT]:
"""
Mark an endpoint as migrated over to the new FastAPI API.
diff --git a/tests/api_fastapi/views/public/test_dags.py
b/tests/api_fastapi/views/public/test_dags.py
index cd1809cb70..ab0c54f517 100644
--- a/tests/api_fastapi/views/public/test_dags.py
+++ b/tests/api_fastapi/views/public/test_dags.py
@@ -25,7 +25,7 @@ from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
from airflow.operators.empty import EmptyOperator
from airflow.utils.session import provide_session
-from airflow.utils.state import DagRunState
+from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from tests_common.test_utils.db import clear_db_dags, clear_db_runs,
clear_db_serialized_dags
@@ -36,84 +36,100 @@ DAG1_DISPLAY_NAME = "display1"
DAG2_ID = "test_dag2"
DAG2_START_DATE = datetime(2021, 6, 15, tzinfo=timezone.utc)
DAG3_ID = "test_dag3"
+DAG4_ID = "test_dag4"
+DAG4_DISPLAY_NAME = "display4"
+DAG5_ID = "test_dag5"
+DAG5_DISPLAY_NAME = "display5"
TASK_ID = "op1"
UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else
"Timezone('UTC')"
+API_PREFIX = "/public/dags"
+
+
+class TestDagEndpoint:
+ """Common class for /public/dags related unit tests."""
+
+ @staticmethod
+ def _clear_db():
+ clear_db_runs()
+ clear_db_dags()
+ clear_db_serialized_dags()
+
+ def _create_deactivated_paused_dag(self, session=None):
+ dag_model = DagModel(
+ dag_id=DAG3_ID,
+ fileloc="/tmp/dag_del_1.py",
+ timetable_summary="2 2 * * *",
+ is_active=False,
+ is_paused=True,
+ owners="test_owner,another_test_owner",
+ next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+ )
+
+ dagrun_failed = DagRun(
+ dag_id=DAG3_ID,
+ run_id="run1",
+ execution_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+ start_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+ run_type=DagRunType.SCHEDULED,
+ state=DagRunState.FAILED,
+ )
+
+ dagrun_success = DagRun(
+ dag_id=DAG3_ID,
+ run_id="run2",
+ execution_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+ start_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.SUCCESS,
+ )
+
+ session.add(dag_model)
+ session.add(dagrun_failed)
+ session.add(dagrun_success)
+
+ @pytest.fixture(autouse=True)
+ @provide_session
+ def setup(self, dag_maker, session=None) -> None:
+ self._clear_db()
+
+ with dag_maker(
+ DAG1_ID,
+ dag_display_name=DAG1_DISPLAY_NAME,
+ schedule=None,
+ start_date=datetime(2018, 6, 15, 0, 0, tzinfo=timezone.utc),
+ doc_md="details",
+ params={"foo": 1},
+ tags=["example"],
+ ):
+ EmptyOperator(task_id=TASK_ID)
+
+ dag_maker.create_dagrun(state=DagRunState.FAILED)
+
+ with dag_maker(
+ DAG2_ID,
+ schedule=None,
+ start_date=DAG2_START_DATE,
+ doc_md="details",
+ params={"foo": 1},
+ max_active_tasks=16,
+ max_active_runs=16,
+ ):
+ EmptyOperator(task_id=TASK_ID)
+
+ self._create_deactivated_paused_dag(session)
+
+ dag_maker.dagbag.sync_to_db()
+ dag_maker.dag_model.has_task_concurrency_limits = True
+ session.merge(dag_maker.dag_model)
+ session.commit()
+
+ def teardown_method(self) -> None:
+ self._clear_db()
+
+
+class TestGetDags(TestDagEndpoint):
+ """Unit tests for Get DAGs."""
-
-@provide_session
-def _create_deactivated_paused_dag(session=None):
- dag_model = DagModel(
- dag_id=DAG3_ID,
- fileloc="/tmp/dag_del_1.py",
- timetable_summary="2 2 * * *",
- is_active=False,
- is_paused=True,
- owners="test_owner,another_test_owner",
- next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
- )
-
- dagrun_failed = DagRun(
- dag_id=DAG3_ID,
- run_id="run1",
- execution_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
- start_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
- run_type=DagRunType.SCHEDULED,
- state=DagRunState.FAILED,
- )
-
- dagrun_success = DagRun(
- dag_id=DAG3_ID,
- run_id="run2",
- execution_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
- start_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
- run_type=DagRunType.MANUAL,
- state=DagRunState.SUCCESS,
- )
-
- session.add(dag_model)
- session.add(dagrun_failed)
- session.add(dagrun_success)
-
-
[email protected](autouse=True)
-@provide_session
-def setup(dag_maker, session=None) -> None:
- clear_db_runs()
- clear_db_dags()
- clear_db_serialized_dags()
-
- with dag_maker(
- DAG1_ID,
- dag_display_name=DAG1_DISPLAY_NAME,
- schedule=None,
- start_date=datetime(2018, 6, 15, 0, 0, tzinfo=timezone.utc),
- doc_md="details",
- params={"foo": 1},
- tags=["example"],
- ):
- EmptyOperator(task_id=TASK_ID)
-
- dag_maker.create_dagrun(state=DagRunState.FAILED)
-
- with dag_maker(
- DAG2_ID,
- schedule=None,
- start_date=DAG2_START_DATE,
- doc_md="details",
- params={"foo": 1},
- max_active_tasks=16,
- max_active_runs=16,
- ):
- EmptyOperator(task_id=TASK_ID)
-
- dag_maker.dagbag.sync_to_db()
- dag_maker.dag_model.has_task_concurrency_limits = True
- session.merge(dag_maker.dag_model)
- session.commit()
- _create_deactivated_paused_dag()
-
-
-class TestGetDags:
@pytest.mark.parametrize(
"query_params, expected_total_entries, expected_ids",
[
@@ -161,7 +177,9 @@ class TestGetDags:
assert [dag["dag_id"] for dag in body["dags"]] == expected_ids
-class TestPatchDag:
+class TestPatchDag(TestDagEndpoint):
+ """Unit tests for Patch DAG."""
+
@pytest.mark.parametrize(
"query_params, dag_id, body, expected_status_code, expected_is_paused",
[
@@ -184,7 +202,9 @@ class TestPatchDag:
assert body["is_paused"] == expected_is_paused
-class TestPatchDags:
+class TestPatchDags(TestDagEndpoint):
+ """Unit tests for Patch DAGs."""
+
@pytest.mark.parametrize(
"query_params, body, expected_status_code, expected_ids,
expected_paused_ids",
[
@@ -239,7 +259,9 @@ class TestPatchDags:
assert paused_dag_ids == expected_paused_ids
-class TestDagDetails:
+class TestDagDetails(TestDagEndpoint):
+ """Unit tests for DAG Details."""
+
@pytest.mark.parametrize(
"query_params, dag_id, expected_status_code, dag_display_name,
start_date",
[
@@ -312,7 +334,9 @@ class TestDagDetails:
assert res_json == expected
-class TestGetDag:
+class TestGetDag(TestDagEndpoint):
+ """Unit tests for Get DAG."""
+
@pytest.mark.parametrize(
"query_params, dag_id, expected_status_code, dag_display_name",
[
@@ -359,3 +383,61 @@ class TestGetDag:
"pickle_id": None,
}
assert res_json == expected
+
+
+class TestDeleteDAG(TestDagEndpoint):
+ """Unit tests for Delete DAG."""
+
+ def _create_dag_for_deletion(
+ self,
+ dag_maker,
+ dag_id=None,
+ dag_display_name=None,
+ has_running_dagruns=False,
+ ):
+ with dag_maker(
+ dag_id,
+ dag_display_name=dag_display_name,
+ start_date=datetime(2024, 10, 10, tzinfo=timezone.utc),
+ ):
+ EmptyOperator(task_id="dummy")
+
+ if has_running_dagruns:
+ dr = dag_maker.create_dagrun()
+ ti = dr.get_task_instances()[0]
+ ti.set_state(TaskInstanceState.RUNNING)
+
+ dag_maker.dagbag.sync_to_db()
+
+ @pytest.mark.parametrize(
+ "dag_id, dag_display_name, status_code_delete, status_code_details,
has_running_dagruns, is_create_dag",
+ [
+ ("test_nonexistent_dag_id", "nonexistent_display_name", 404, 404,
False, False),
+ (DAG4_ID, DAG4_DISPLAY_NAME, 204, 404, False, True),
+ (DAG5_ID, DAG5_DISPLAY_NAME, 409, 200, True, True),
+ ],
+ )
+ def test_delete_dag(
+ self,
+ dag_maker,
+ test_client,
+ dag_id,
+ dag_display_name,
+ status_code_delete,
+ status_code_details,
+ has_running_dagruns,
+ is_create_dag,
+ ):
+ if is_create_dag:
+ self._create_dag_for_deletion(
+ dag_maker=dag_maker,
+ dag_id=dag_id,
+ dag_display_name=dag_display_name,
+ has_running_dagruns=has_running_dagruns,
+ )
+
+ delete_response = test_client.delete(f"{API_PREFIX}/{dag_id}")
+ assert delete_response.status_code == status_code_delete
+
+ details_response = test_client.get(f"{API_PREFIX}/{dag_id}/details")
+ assert details_response.status_code == status_code_details