This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b83cc9b5e2 Add clear DagRun endpoint. (#23451)
b83cc9b5e2 is described below

commit b83cc9b5e2c7e2516b0881861bbc0f8589cb531d
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Tue May 24 09:00:20 2022 +0530

    Add clear DagRun endpoint. (#23451)
---
 .../api_connexion/endpoints/dag_run_endpoint.py    |  56 ++++++++++
 airflow/api_connexion/openapi/v1.yaml              |  47 +++++++++
 airflow/api_connexion/schemas/dag_run_schema.py    |   7 ++
 .../endpoints/test_dag_run_endpoint.py             | 114 +++++++++++++++++++++
 4 files changed, 224 insertions(+)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index a3f9bfde92..9fde3db885 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -33,11 +33,16 @@ from airflow.api_connexion.exceptions import AlreadyExists, 
BadRequest, NotFound
 from airflow.api_connexion.parameters import apply_sorting, check_limit, 
format_datetime, format_parameters
 from airflow.api_connexion.schemas.dag_run_schema import (
     DAGRunCollection,
+    clear_dagrun_form_schema,
     dagrun_collection_schema,
     dagrun_schema,
     dagruns_batch_form_schema,
     set_dagrun_state_form_schema,
 )
+from airflow.api_connexion.schemas.task_instance_schema import (
+    TaskInstanceReferenceCollection,
+    task_instance_reference_collection_schema,
+)
 from airflow.api_connexion.types import APIResponse
 from airflow.models import DagModel, DagRun
 from airflow.security import permissions
@@ -318,3 +323,54 @@ def update_dag_run_state(*, dag_id: str, dag_run_id: str, 
session: Session = NEW
         set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, 
commit=True)
     dag_run = session.query(DagRun).get(dag_run.id)
     return dagrun_schema.dump(dag_run)
+
+
[email protected]_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+    ],
+)
+@provide_session
+def clear_dag_run(*, dag_id: str, dag_run_id: str, session: Session = 
NEW_SESSION) -> APIResponse:
+    """Clear a dag run."""
+    dag_run: Optional[DagRun] = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == 
dag_run_id).one_or_none()
+    )
+    if dag_run is None:
+        error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
+        raise NotFound(error_message)
+    try:
+        post_body = clear_dagrun_form_schema.load(request.json)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+
+    dry_run = post_body.get('dry_run', False)
+    dag = current_app.dag_bag.get_dag(dag_id)
+    start_date = dag_run.logical_date
+    end_date = dag_run.logical_date
+
+    if dry_run:
+        task_instances = dag.clear(
+            start_date=start_date,
+            end_date=end_date,
+            task_ids=None,
+            include_subdags=True,
+            include_parentdag=True,
+            only_failed=False,
+            dry_run=True,
+        )
+        return task_instance_reference_collection_schema.dump(
+            TaskInstanceReferenceCollection(task_instances=task_instances)
+        )
+    else:
+        dag.clear(
+            start_date=start_date,
+            end_date=end_date,
+            task_ids=None,
+            include_subdags=True,
+            include_parentdag=True,
+            only_failed=False,
+        )
+        dag_run.refresh_from_db()
+        return dagrun_schema.dump(dag_run)
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index df19d12dda..fc71fbb7f5 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -781,6 +781,43 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/clear:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:
+      summary: Clear a DAG run
+      description: |
+        Clear a DAG run.
+
+        *New in version 2.4.0*
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dag_run_endpoint
+      operationId: clear_dag_run
+      tags: [DAGRun]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ClearDagRun'
+
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGRun'
+        '400':
+          $ref: '#/components/responses/BadRequest'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
+        '403':
+          $ref: '#/components/responses/PermissionDenied'
+        '404':
+          $ref: '#/components/responses/NotFound'
+
   /eventLogs:
     get:
       summary: List log entries
@@ -3311,6 +3348,16 @@ components:
           nullable: true
 
     # Form
+    ClearDagRun:
+      type: object
+      properties:
+        dry_run:
+          description: |
+            If set, don't actually run this operation. The response will 
contain a list of task instances
+            planned to be cleaned, but not modified in any way.
+          type: boolean
+          default: true
+
     ClearTaskInstance:
       type: object
       properties:
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py 
b/airflow/api_connexion/schemas/dag_run_schema.py
index 2933888b8c..5cd79b2022 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -119,6 +119,12 @@ class SetDagRunStateFormSchema(Schema):
     )
 
 
+class ClearDagRunStateFormSchema(Schema):
+    """Schema for handling the request of clearing a DAG run"""
+
+    dry_run = fields.Boolean(load_default=True)
+
+
 class DAGRunCollection(NamedTuple):
     """List of DAGRuns with metadata"""
 
@@ -158,4 +164,5 @@ class DagRunsBatchFormSchema(Schema):
 dagrun_schema = DAGRunSchema()
 dagrun_collection_schema = DAGRunCollectionSchema()
 set_dagrun_state_form_schema = SetDagRunStateFormSchema()
+clear_dagrun_form_schema = ClearDagRunStateFormSchema()
 dagruns_batch_form_schema = DagRunsBatchFormSchema()
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index f539839208..dd5803564a 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1369,3 +1369,117 @@ class TestPatchDagRunState(TestDagRunEndpoint):
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 404
+
+
+class TestClearDagRun(TestDagRunEndpoint):
+    def test_should_respond_200(self, dag_maker, session):
+        dag_id = "TEST_DAG_ID"
+        dag_run_id = "TEST_DAG_RUN_ID"
+        with dag_maker(dag_id) as dag:
+            task = EmptyOperator(task_id="task_id", dag=dag)
+        self.app.dag_bag.bag_dag(dag, root_dag=dag)
+        dr = dag_maker.create_dagrun(run_id=dag_run_id)
+        ti = dr.get_task_instance(task_id="task_id")
+        ti.task = task
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.commit()
+
+        request_json = {"dry_run": False}
+
+        response = self.client.post(
+            f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/clear",
+            json=request_json,
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+
+        dr = session.query(DagRun).filter(DagRun.run_id == dr.run_id).first()
+        assert response.status_code == 200
+        assert response.json == {
+            "conf": {},
+            "dag_id": dag_id,
+            "dag_run_id": dag_run_id,
+            "end_date": None,
+            "execution_date": dr.execution_date.isoformat(),
+            "external_trigger": False,
+            "logical_date": dr.logical_date.isoformat(),
+            "start_date": dr.logical_date.isoformat(),
+            "state": "queued",
+            "data_interval_start": dr.data_interval_start.isoformat(),
+            "data_interval_end": dr.data_interval_end.isoformat(),
+            "last_scheduling_decision": None,
+            "run_type": dr.run_type,
+        }
+
+        ti.refresh_from_db()
+        assert ti.state is None
+
+    def test_dry_run(self, dag_maker, session):
+        """Test that dry_run being True returns TaskInstances without clearing 
DagRun"""
+        dag_id = "TEST_DAG_ID"
+        dag_run_id = "TEST_DAG_RUN_ID"
+        with dag_maker(dag_id) as dag:
+            task = EmptyOperator(task_id="task_id", dag=dag)
+        self.app.dag_bag.bag_dag(dag, root_dag=dag)
+        dr = dag_maker.create_dagrun(run_id=dag_run_id)
+        ti = dr.get_task_instance(task_id="task_id")
+        ti.task = task
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.commit()
+
+        request_json = {"dry_run": True}
+
+        response = self.client.post(
+            f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/clear",
+            json=request_json,
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+
+        assert response.status_code == 200
+        assert response.json == {
+            "task_instances": [
+                {
+                    "dag_id": dag_id,
+                    "dag_run_id": dag_run_id,
+                    "execution_date": dr.execution_date.isoformat(),
+                    "task_id": "task_id",
+                }
+            ]
+        }
+
+        ti.refresh_from_db()
+        assert ti.state == State.SUCCESS
+
+        dr = session.query(DagRun).filter(DagRun.run_id == dr.run_id).first()
+        assert dr.state == "running"
+
+    def test_should_raises_401_unauthenticated(self, session):
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1/clear",
+            json={
+                "dry_run": True,
+            },
+        )
+
+        assert_401(response)
+
+    def test_should_raise_403_forbidden(self):
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1/clear",
+            json={
+                "dry_run": True,
+            },
+            environ_overrides={"REMOTE_USER": "test_no_permissions"},
+        )
+        assert response.status_code == 403
+
+    def test_should_respond_404(self):
+        response = self.client.post(
+            "api/v1/dags/INVALID_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1/clear",
+            json={
+                "dry_run": True,
+            },
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 404

Reply via email to