pierrejeambrun commented on code in PR #42975:
URL: https://github.com/apache/airflow/pull/42975#discussion_r1838423844
##########
tests/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -254,3 +254,40 @@ def test_delete_dag_run_not_found(self, test_client):
assert response.status_code == 404
body = response.json()
assert body["detail"] == "The DagRun with dag_id: `test_dag1` and
run_id: `invalid` was not found"
+
+
+class TestClearDagRun:
+ def test_clear_dag_run(self, test_client):
+ response = test_client.post(
+ f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
json={"dry_run": False}
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["dag_id"] == DAG1_ID
+ assert body["run_id"] == DAG1_RUN1_ID
+ assert body["state"] == "queued"
+
+ @pytest.mark.parametrize(
+ "body",
+ [{"dry_run": True}, {}],
+ )
+ def test_clear_dag_run_dry_run(self, test_client, body):
+ response =
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
json=body)
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == 1
+ for each in body["task_instances"]:
+ assert each["state"] is None
Review Comment:
Shouldn't the state still be `success` of the TI ? Before cleaning it's in
success and after the dry_run it remains in success ?
##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -141,3 +146,48 @@ async def patch_dag_run_state(
dag_run = session.get(DagRun, dag_run.id)
return DAGRunResponse.model_validate(dag_run, from_attributes=True)
+
+
+@dag_run_router.post("/{dag_run_id}/clear",
responses=create_openapi_http_exception_doc([401, 403, 404]))
+async def clear_dag_run(
+ dag_id: str,
+ dag_run_id: str,
+ patch_body: DAGRunClearBody,
+ request: Request,
+ session: Annotated[Session, Depends(get_session)],
+) -> TaskInstanceCollectionResponse | DAGRunResponse:
+ dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id))
+ if dag_run is None:
+ raise HTTPException(
+ 404, f"The DagRun with dag_id: `{dag_id}` and run_id:
`{dag_run_id}` was not found"
+ )
+
+ dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+
+ if patch_body.dry_run:
+ task_instances = dag.clear(
+ start_date=dag_run.logical_date,
+ end_date=dag_run.logical_date,
+ task_ids=None,
+ only_failed=False,
+ dry_run=True,
+ session=session,
+ )
+
+ return TaskInstanceCollectionResponse(
+ task_instances=[
+ TaskInstanceResponse.model_validate(ti, from_attributes=True)
+ for ti in task_instances # type: ignore[union-attr]
Review Comment:
Two solutions here:
- When we give `dry_run=True` to the function, we know for sure that the
return type is a `Iterable[TaskInstance]`. Then you can safely manually cast it
here with `typing.cast` so mypy does not complain.
- We can use `@overload` typing to precise the signature of the function ->
When dry_run=True return type is `Iterable[TaskInstance]`, and when
`dry_run=False` return type is `int`.
Second solution is better I think because the rest of the code base can
re-use it and avoid manual type casting if they encounter the same situation as
here.
##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -147,3 +152,48 @@ def patch_dag_run(
dag_run = session.get(DagRun, dag_run.id)
return DAGRunResponse.model_validate(dag_run, from_attributes=True)
+
+
+@dag_run_router.post("/{dag_run_id}/clear",
responses=create_openapi_http_exception_doc([401, 403, 404]))
+def clear_dag_run(
+ dag_id: str,
+ dag_run_id: str,
+ patch_body: DAGRunClearBody,
Review Comment:
The route is a post, so `post_body` ? but actually just `body` is a better
name I think. I started doing the `patch_body` but I don't think it's great.
```suggestion
body: DAGRunClearBody,
```
##########
airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -41,6 +41,12 @@ class DAGRunPatchBody(BaseModel):
note: str | None = Field(None, max_length=1000)
+class DAGRunClearBody(BaseModel):
+ """DAG Run serializer for clear endpoint body."""
+
+ dry_run: bool | None = True
Review Comment:
Can never be `None` I believe. Also in the legacy the default value is
`False` -> `dry_run = post_body.get("dry_run", False)` I'm not sure we want to
change that.
```suggestion
dry_run: bool = True
```
##########
tests/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -254,3 +254,40 @@ def test_delete_dag_run_not_found(self, test_client):
assert response.status_code == 404
body = response.json()
assert body["detail"] == "The DagRun with dag_id: `test_dag1` and
run_id: `invalid` was not found"
+
+
+class TestClearDagRun:
+ def test_clear_dag_run(self, test_client):
+ response = test_client.post(
+ f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
json={"dry_run": False}
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["dag_id"] == DAG1_ID
+ assert body["run_id"] == DAG1_RUN1_ID
+ assert body["state"] == "queued"
+
+ @pytest.mark.parametrize(
+ "body",
+ [{"dry_run": True}, {}],
+ )
+ def test_clear_dag_run_dry_run(self, test_client, body):
+ response =
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
json=body)
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == 1
+ for each in body["task_instances"]:
+ assert each["state"] is None
Review Comment:
And I think we can assert the `DAG1_RUN1` state to be sure that nothing
changed. (fetch from db like in the legacy test).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]