omkar-foss commented on code in PR #44223:
URL: https://github.com/apache/airflow/pull/44223#discussion_r1852986783
##########
airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -482,3 +484,84 @@ def get_mapped_task_instance_try_details(
map_index=map_index,
session=session,
)
+
+
+@task_instances_router.patch(
+ "/{task_id}",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND,
status.HTTP_400_BAD_REQUEST]),
+)
+@task_instances_router.patch(
+ "/{task_id}/{map_index}",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND,
status.HTTP_400_BAD_REQUEST]),
+)
+def patch_task_instance(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ request: Request,
+ body: PatchTaskInstanceBody,
+ session: Annotated[Session, Depends(get_session)],
+ map_index: int = -1,
+ update_mask: list[str] | None = Query(None),
+) -> TaskInstanceResponse:
+ """Update the state of a task instance."""
+ dag = request.app.state.dag_bag.get_dag(dag_id)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG {dag_id} not
found")
+
+ if not dag.has_task(task_id):
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task '{task_id}' not
found in DAG '{dag_id}'")
+
+ query = (
+ select(TI)
+ .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id ==
task_id)
+ .join(TI.dag_run)
+ .options(joinedload(TI.rendered_task_instance_fields))
+ )
+ if map_index == -1:
+ query = query.where(or_(TI.map_index == -1, TI.map_index is None))
+ else:
+ query = query.where(TI.map_index == map_index)
+
+ try:
+ ti = session.scalar(query)
+ except MultipleResultsFound:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ "Multiple task instances found. As the TI is mapped, add the
map_index value to the URL",
+ )
+
+ err_msg_404 = f"Task Instance not found for dag_id={dag_id},
run_id={dag_run_id}, task_id={task_id}"
+ if ti is None:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, err_msg_404)
+
+ fields_to_update = body.model_fields_set
+ if update_mask:
+ fields_to_update = fields_to_update.intersection(update_mask)
+
+ for field in fields_to_update:
+ if field == "new_state":
+ if not body.dry_run:
+ tis: list[TI] = dag.set_task_instance_state(
Review Comment:
Are these to be included for updation as well? Let me know if required I'll
add these too.
```python
upstream=body.include_upstream,
downstream=body.include_downstream,
future=body.include_future,
past=body.include_past,
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]