This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c3aabba0488 AIP-84 Migrate Clear Dag Run public endpoint to FastAPI  
(#42975)
c3aabba0488 is described below

commit c3aabba04881e5d25fe3b6ef7727fb52d28b8a2c
Author: Kalyan R <[email protected]>
AuthorDate: Fri Nov 15 13:52:37 2024 +0530

    AIP-84 Migrate Clear Dag Run public endpoint to FastAPI  (#42975)
    
    * add clear_dag_run
    
    * add tests
    
    * Merge branch 'main' of https://github.com/apache/airflow into 
kalyan/API-84/clear_dag_run
    
    * add ti response
    
    * add
    
    * use logical_date
    
    * fix tests
    
    * remove async
    
    * Update airflow/api_fastapi/core_api/routes/public/dag_run.py
    
    Co-authored-by: Pierre Jeambrun <[email protected]>
    
    * Update airflow/api_fastapi/core_api/datamodels/dag_run.py
    
    Co-authored-by: Pierre Jeambrun <[email protected]>
    
    * remove type ignore
    
    * update ti state and assert it
    
    * reuse state
    
    * remove breakpoint
    
    * feedback
    
    ---------
    
    Co-authored-by: Pierre Jeambrun <[email protected]>
---
 .../api_connexion/endpoints/dag_run_endpoint.py    |  1 +
 airflow/api_fastapi/core_api/datamodels/dag_run.py |  6 ++
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 68 ++++++++++++++++++++++
 .../api_fastapi/core_api/routes/public/dag_run.py  | 53 +++++++++++++++++
 airflow/models/dag.py                              | 36 +++++++++++-
 airflow/ui/openapi-gen/queries/common.ts           |  3 +
 airflow/ui/openapi-gen/queries/queries.ts          | 47 +++++++++++++++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 13 +++++
 airflow/ui/openapi-gen/requests/services.gen.ts    | 32 ++++++++++
 airflow/ui/openapi-gen/requests/types.gen.ts       | 44 ++++++++++++++
 .../core_api/routes/public/test_dag_run.py         | 51 +++++++++++++++-
 11 files changed, 351 insertions(+), 3 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 6a38eb27ff4..0c0a0322e62 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -403,6 +403,7 @@ def update_dag_run_state(*, dag_id: str, dag_run_id: str, 
session: Session = NEW
     return dagrun_schema.dump(dag_run)
 
 
+@mark_fastapi_migration_done
 @security.requires_access_dag("PUT", DagAccessEntity.RUN)
 @action_logging
 @provide_session
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 102567f6997..8241885aff2 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -41,6 +41,12 @@ class DAGRunPatchBody(BaseModel):
     note: str | None = Field(None, max_length=1000)
 
 
+class DAGRunClearBody(BaseModel):
+    """DAG Run serializer for clear endpoint body."""
+
+    dry_run: bool = True
+
+
 class DAGRunResponse(BaseModel):
     """DAG Run serializer for responses."""
 
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 890fb6b6c8d..56e40a7899a 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1164,6 +1164,65 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
+    post:
+      tags:
+      - DagRun
+      summary: Clear Dag Run
+      operationId: clear_dag_run
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/DAGRunClearBody'
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                anyOf:
+                - $ref: '#/components/schemas/TaskInstanceCollectionResponse'
+                - $ref: '#/components/schemas/DAGRunResponse'
+                title: Response Clear Dag Run
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/dagSources/{file_token}:
     get:
       tags:
@@ -4383,6 +4442,15 @@ components:
       - file_token
       title: DAGResponse
       description: DAG serializer for responses.
+    DAGRunClearBody:
+      properties:
+        dry_run:
+          type: boolean
+          title: Dry Run
+          default: true
+      type: object
+      title: DAGRunClearBody
+      description: DAG Run serializer for clear endpoint body.
     DAGRunPatchBody:
       properties:
         state:
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 810896806ee..d95cf76f69a 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -31,10 +31,15 @@ from airflow.api.common.mark_tasks import (
 from airflow.api_fastapi.common.db.common import get_session
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.dag_run import (
+    DAGRunClearBody,
     DAGRunPatchBody,
     DAGRunPatchStates,
     DAGRunResponse,
 )
+from airflow.api_fastapi.core_api.datamodels.task_instances import (
+    TaskInstanceCollectionResponse,
+    TaskInstanceResponse,
+)
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.models import DAG, DagRun
 
@@ -142,3 +147,51 @@ def patch_dag_run(
     dag_run = session.get(DagRun, dag_run.id)
 
     return DAGRunResponse.model_validate(dag_run, from_attributes=True)
+
+
+@dag_run_router.post(
+    "/{dag_run_id}/clear", 
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND])
+)
+def clear_dag_run(
+    dag_id: str,
+    dag_run_id: str,
+    body: DAGRunClearBody,
+    request: Request,
+    session: Annotated[Session, Depends(get_session)],
+) -> TaskInstanceCollectionResponse | DAGRunResponse:
+    dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, 
run_id=dag_run_id))
+    if dag_run is None:
+        raise HTTPException(
+            404, f"The DagRun with dag_id: `{dag_id}` and run_id: 
`{dag_run_id}` was not found"
+        )
+
+    dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+    start_date = dag_run.logical_date
+    end_date = dag_run.logical_date
+
+    if body.dry_run:
+        task_instances = dag.clear(
+            start_date=start_date,
+            end_date=end_date,
+            task_ids=None,
+            only_failed=False,
+            dry_run=True,
+            session=session,
+        )
+
+        return TaskInstanceCollectionResponse(
+            task_instances=[
+                TaskInstanceResponse.model_validate(ti, from_attributes=True) 
for ti in task_instances
+            ],
+            total_entries=len(task_instances),
+        )
+    else:
+        dag.clear(
+            start_date=dag_run.start_date,
+            end_date=dag_run.end_date,
+            task_ids=None,
+            only_failed=False,
+            session=session,
+        )
+        dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == 
dag_run.id))
+        return DAGRunResponse.model_validate(dag_run_cleared, 
from_attributes=True)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index e48ec0a9a9c..8ca80d019e0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1404,6 +1404,40 @@ class DAG(TaskSDKDag, LoggingMixin):
 
         return altered
 
+    @overload
+    def clear(
+        self,
+        *,
+        dry_run: Literal[True],
+        task_ids: Collection[str | tuple[str, int]] | None = None,
+        start_date: datetime | None = None,
+        end_date: datetime | None = None,
+        only_failed: bool = False,
+        only_running: bool = False,
+        confirm_prompt: bool = False,
+        dag_run_state: DagRunState = DagRunState.QUEUED,
+        session: Session = NEW_SESSION,
+        dag_bag: DagBag | None = None,
+        exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
+    ) -> list[TaskInstance]: ...  # pragma: no cover
+
+    @overload
+    def clear(
+        self,
+        *,
+        task_ids: Collection[str | tuple[str, int]] | None = None,
+        start_date: datetime | None = None,
+        end_date: datetime | None = None,
+        only_failed: bool = False,
+        only_running: bool = False,
+        confirm_prompt: bool = False,
+        dag_run_state: DagRunState = DagRunState.QUEUED,
+        dry_run: Literal[False] = False,
+        session: Session = NEW_SESSION,
+        dag_bag: DagBag | None = None,
+        exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
+    ) -> int: ...  # pragma: no cover
+
     @provide_session
     def clear(
         self,
@@ -1418,7 +1452,7 @@ class DAG(TaskSDKDag, LoggingMixin):
         session: Session = NEW_SESSION,
         dag_bag: DagBag | None = None,
         exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
-    ) -> int | Iterable[TaskInstance]:
+    ) -> int | list[TaskInstance]:
         """
         Clear a set of task instances associated with the current dag for a 
specified date range.
 
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 7758a94a410..d1bb5661434 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1054,6 +1054,9 @@ export type ConnectionServicePostConnectionMutationResult 
= Awaited<
 export type ConnectionServiceTestConnectionMutationResult = Awaited<
   ReturnType<typeof ConnectionService.testConnection>
 >;
+export type DagRunServiceClearDagRunMutationResult = Awaited<
+  ReturnType<typeof DagRunService.clearDagRun>
+>;
 export type PoolServicePostPoolMutationResult = Awaited<
   ReturnType<typeof PoolService.postPool>
 >;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 3b57095183a..dc00175ba4d 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -33,6 +33,7 @@ import {
   BackfillPostBody,
   ConnectionBody,
   DAGPatchBody,
+  DAGRunClearBody,
   DAGRunPatchBody,
   DagRunState,
   DagWarningType,
@@ -1814,6 +1815,52 @@ export const useConnectionServiceTestConnection = <
       }) as unknown as Promise<TData>,
     ...options,
   });
+/**
+ * Clear Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.requestBody
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceClearDagRun = <
+  TData = Common.DagRunServiceClearDagRunMutationResult,
+  TError = unknown,
+  TContext = unknown,
+>(
+  options?: Omit<
+    UseMutationOptions<
+      TData,
+      TError,
+      {
+        dagId: string;
+        dagRunId: string;
+        requestBody: DAGRunClearBody;
+      },
+      TContext
+    >,
+    "mutationFn"
+  >,
+) =>
+  useMutation<
+    TData,
+    TError,
+    {
+      dagId: string;
+      dagRunId: string;
+      requestBody: DAGRunClearBody;
+    },
+    TContext
+  >({
+    mutationFn: ({ dagId, dagRunId, requestBody }) =>
+      DagRunService.clearDagRun({
+        dagId,
+        dagRunId,
+        requestBody,
+      }) as unknown as Promise<TData>,
+    ...options,
+  });
 /**
  * Post Pool
  * Create a Pool.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 8a8a50bb743..2011934848b 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1303,6 +1303,19 @@ export const $DAGResponse = {
   description: "DAG serializer for responses.",
 } as const;
 
+export const $DAGRunClearBody = {
+  properties: {
+    dry_run: {
+      type: "boolean",
+      title: "Dry Run",
+      default: true,
+    },
+  },
+  type: "object",
+  title: "DAGRunClearBody",
+  description: "DAG Run serializer for clear endpoint body.",
+} as const;
+
 export const $DAGRunPatchBody = {
   properties: {
     state: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 2cd106a8e08..d5efbd4ab2f 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -45,6 +45,8 @@ import type {
   DeleteDagRunResponse,
   PatchDagRunData,
   PatchDagRunResponse,
+  ClearDagRunData,
+  ClearDagRunResponse,
   GetDagSourceData,
   GetDagSourceResponse,
   GetDagStatsData,
@@ -735,6 +737,36 @@ export class DagRunService {
       },
     });
   }
+
+  /**
+   * Clear Dag Run
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.dagRunId
+   * @param data.requestBody
+   * @returns unknown Successful Response
+   * @throws ApiError
+   */
+  public static clearDagRun(
+    data: ClearDagRunData,
+  ): CancelablePromise<ClearDagRunResponse> {
+    return __request(OpenAPI, {
+      method: "POST",
+      url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear",
+      path: {
+        dag_id: data.dagId,
+        dag_run_id: data.dagRunId,
+      },
+      body: data.requestBody,
+      mediaType: "application/json",
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
 }
 
 export class DagSourceService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 35848f3a88f..a1ee89237a8 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -273,6 +273,13 @@ export type DAGResponse = {
   readonly file_token: string;
 };
 
+/**
+ * DAG Run serializer for clear endpoint body.
+ */
+export type DAGRunClearBody = {
+  dry_run?: boolean;
+};
+
 /**
  * DAG Run Serializer for PATCH requests.
  */
@@ -1114,6 +1121,16 @@ export type PatchDagRunData = {
 
 export type PatchDagRunResponse = DAGRunResponse;
 
+export type ClearDagRunData = {
+  dagId: string;
+  dagRunId: string;
+  requestBody: DAGRunClearBody;
+};
+
+export type ClearDagRunResponse =
+  | TaskInstanceCollectionResponse
+  | DAGRunResponse;
+
 export type GetDagSourceData = {
   accept?: string;
   fileToken: string;
@@ -1978,6 +1995,33 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear": {
+    post: {
+      req: ClearDagRunData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: TaskInstanceCollectionResponse | DAGRunResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/dagSources/{file_token}": {
     get: {
       req: GetDagSourceData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index 64c3512e88b..eec6955b788 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -20,10 +20,12 @@ from __future__ import annotations
 from datetime import datetime, timezone
 
 import pytest
+from sqlalchemy import select
 
+from airflow.models import DagRun
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.session import provide_session
-from airflow.utils.state import DagRunState
+from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 from tests_common.test_utils.db import clear_db_dags, clear_db_runs, 
clear_db_serialized_dags
@@ -65,15 +67,20 @@ def setup(dag_maker, session=None):
         schedule="@daily",
         start_date=START_DATE,
     ):
-        EmptyOperator(task_id="task_1")
+        task1 = EmptyOperator(task_id="task_1")
     dag_run1 = dag_maker.create_dagrun(
         run_id=DAG1_RUN1_ID,
         state=DAG1_RUN1_STATE,
         run_type=DAG1_RUN1_RUN_TYPE,
         triggered_by=DAG1_RUN1_TRIGGERED_BY,
     )
+
     dag_run1.note = (DAG1_RUN1_NOTE, 1)
 
+    ti1 = dag_run1.get_task_instance(task_id="task_1")
+    ti1.task = task1
+    ti1.state = State.SUCCESS
+
     dag_maker.create_dagrun(
         run_id=DAG1_RUN2_ID,
         state=DAG1_RUN2_STATE,
@@ -106,6 +113,7 @@ def setup(dag_maker, session=None):
     dag_maker.dagbag.sync_to_db()
     dag_maker.dag_model
     dag_maker.dag_model.has_task_concurrency_limits = True
+    session.merge(ti1)
     session.merge(dag_maker.dag_model)
     session.commit()
 
@@ -254,3 +262,42 @@ class TestDeleteDagRun:
         assert response.status_code == 404
         body = response.json()
         assert body["detail"] == "The DagRun with dag_id: `test_dag1` and 
run_id: `invalid` was not found"
+
+
+class TestClearDagRun:
+    def test_clear_dag_run(self, test_client):
+        response = test_client.post(
+            f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", 
json={"dry_run": False}
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["dag_id"] == DAG1_ID
+        assert body["run_id"] == DAG1_RUN1_ID
+        assert body["state"] == "queued"
+
+    @pytest.mark.parametrize(
+        "body",
+        [{"dry_run": True}, {}],
+    )
+    def test_clear_dag_run_dry_run(self, test_client, session, body):
+        response = 
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", 
json=body)
+        assert response.status_code == 200
+        body = response.json()
+        assert body["total_entries"] == 1
+        for each in body["task_instances"]:
+            assert each["state"] == "success"
+        dag_run = session.scalar(select(DagRun).filter_by(dag_id=DAG1_ID, 
run_id=DAG1_RUN1_ID))
+        assert dag_run.state == DAG1_RUN1_STATE
+
+    def test_clear_dag_run_not_found(self, test_client):
+        response = 
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/invalid/clear", 
json={"dry_run": False})
+        assert response.status_code == 404
+        body = response.json()
+        assert body["detail"] == "The DagRun with dag_id: `test_dag1` and 
run_id: `invalid` was not found"
+
+    def test_clear_dag_run_unprocessable_entity(self, test_client):
+        response = 
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear")
+        assert response.status_code == 422
+        body = response.json()
+        assert body["detail"][0]["msg"] == "Field required"
+        assert body["detail"][0]["loc"][0] == "body"

Reply via email to